1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_host_environment_from_tool_catalog(
10 catalog: &crate::ToolCatalog,
11 abilities: lashlang::LashlangAbilities,
12 language_features: lashlang::LashlangLanguageFeatures,
13 host_resources: lashlang::LashlangHostCatalog,
14) -> lashlang::LashlangHostEnvironment {
15 let mut resources = lashlang_resources_from_tool_catalog(catalog);
16 resources.extend(host_resources);
17 lashlang::LashlangHostEnvironment::new(resources, abilities)
18 .with_language_features(language_features)
19}
20
21pub(crate) fn lashlang_resources_from_tool_catalog(
22 catalog: &crate::ToolCatalog,
23) -> lashlang::LashlangHostCatalog {
24 let mut host_catalog = lashlang::LashlangHostCatalog::new();
25 for entry in catalog.tools.iter() {
26 if entry.availability.is_callable() {
27 let lashlang_binding = entry
28 .manifest
29 .lashlang_binding
30 .executable_for(&entry.manifest.name);
31 host_catalog.add_module_operation(
32 lashlang_binding.module_path.iter().map(String::as_str),
33 lashlang_binding.authority_type.clone(),
34 lashlang_binding.operation.clone(),
35 entry.manifest.name.clone(),
36 lashlang::TypeExpr::Any,
37 lashlang::TypeExpr::Any,
38 );
39 }
40 }
41 host_catalog
42}
43
44#[derive(Clone)]
45pub struct RuntimeExecutionContext<'run> {
46 pub(super) session_id: String,
47 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
48 lashlang_abilities: lashlang::LashlangAbilities,
49 lashlang_language_features: lashlang::LashlangLanguageFeatures,
50 lashlang_host_environment: lashlang::LashlangHostEnvironment,
51 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
52 attachment_store: Arc<dyn crate::AttachmentStore>,
53 chronological_projection: Arc<crate::ChronologicalProjection>,
54 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
55 turn_context: crate::TurnContext,
56 execution_env_spec: crate::ProcessExecutionEnvSpec,
57 process_originator: Option<crate::ProcessOriginator>,
58 pub(super) runtime_process_id: Option<String>,
59 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
60 process_wake_target: Option<crate::SessionScope>,
61 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
62 lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
63 lashlang_execution_context: lash_trace::TraceContext,
64 turn_phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
65 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
66 pub(super) cancellation_token: Option<CancellationToken>,
67 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
72}
73
74impl<'run> RuntimeExecutionContext<'run> {
75 pub(crate) fn drain_tool_trigger_outcomes(
76 &self,
77 ) -> Result<Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>, crate::PluginError> {
78 self.dispatch
79 .trigger_outcomes
80 .drain()
81 .map_err(crate::PluginError::Session)
82 }
83
84 pub(super) fn process_scope(
85 &self,
86 parent_invocation: Option<crate::RuntimeInvocation>,
87 ) -> crate::ProcessOpScope<'_> {
88 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
89 .with_parent_invocation(parent_invocation)
90 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
91 }
92
93 #[allow(
94 clippy::too_many_arguments,
95 reason = "code execution bridge carries explicit per-turn runtime dependencies"
96 )]
97 pub(crate) fn new(
98 session_id: String,
99 dispatch: Arc<ToolDispatchContext<'run>>,
100 lashlang_abilities: lashlang::LashlangAbilities,
101 lashlang_language_features: lashlang::LashlangLanguageFeatures,
102 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
103 attachment_store: Arc<dyn crate::AttachmentStore>,
104 chronological_projection: Arc<crate::ChronologicalProjection>,
105 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
106 turn_context: crate::TurnContext,
107 ) -> Self {
108 let lashlang_host_environment = lashlang_host_environment_from_tool_catalog(
109 &dispatch.tool_catalog,
110 lashlang_abilities,
111 lashlang_language_features,
112 dispatch.plugins.lashlang_resources(),
113 );
114 Self {
115 session_id,
116 dispatch,
117 lashlang_abilities,
118 lashlang_language_features,
119 lashlang_host_environment,
120 lashlang_artifact_store,
121 attachment_store,
122 chronological_projection,
123 protocol_extension,
124 turn_context,
125 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
126 crate::PluginOptions::default(),
127 crate::SessionPolicy::default(),
128 ),
129 process_originator: None,
130 runtime_process_id: None,
131 started_process_ids: Arc::default(),
132 process_env_ref: None,
133 process_wake_target: None,
134 parent_invocation: None,
135 lashlang_execution_sink: None,
136 lashlang_execution_context: lash_trace::TraceContext::default(),
137 turn_phase_probe: None,
138 turn_event_tx: None,
139 cancellation_token: None,
140 }
141 }
142
143 pub fn session_id(&self) -> &str {
144 &self.session_id
145 }
146
147 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
148 Arc::clone(&self.attachment_store)
149 }
150
151 pub async fn put_lashlang_module_artifact(
152 &self,
153 artifact: &lashlang::ModuleArtifact,
154 ) -> Result<(), String> {
155 self.lashlang_artifact_store
156 .put_module_artifact(artifact)
157 .await
158 .map_err(|err| err.to_string())
159 }
160
161 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
162 Arc::clone(&self.chronological_projection)
163 }
164
165 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
166 self.protocol_extension
167 .as_ref()
168 .and_then(|extension| extension.as_any().downcast_ref::<T>())
169 }
170
171 pub fn turn_context(&self) -> &crate::TurnContext {
172 &self.turn_context
173 }
174
175 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
176 self.dispatch.session_graph.as_ref()
177 }
178
179 pub(super) async fn emit_turn_activity(
180 &self,
181 correlation_id: TurnActivityId,
182 event: TurnEvent,
183 ) {
184 if let Some(tx) = &self.turn_event_tx {
185 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
186 }
187 }
188
189 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
190 self.turn_event_tx = Some(turn_event_tx);
191 self
192 }
193
194 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
195 self.parent_invocation = Some(metadata);
196 self
197 }
198
199 pub(crate) fn with_execution_env_spec(
200 mut self,
201 execution_env_spec: crate::ProcessExecutionEnvSpec,
202 ) -> Self {
203 self.execution_env_spec = execution_env_spec;
204 self
205 }
206
207 pub(crate) fn with_process_registration_context(
208 mut self,
209 registration: &crate::ProcessRegistration,
210 ) -> Self {
211 self.process_originator = Some(registration.provenance.originator.clone());
212 self.runtime_process_id = Some(registration.id.clone());
213 self.process_env_ref = registration.env_ref.clone();
214 self.process_wake_target = registration.wake_target.clone();
215 self
216 }
217
218 pub(super) fn record_started_process(&self, process_id: &str) {
222 self.started_process_ids
223 .lock()
224 .expect("started process ids lock")
225 .insert(process_id.to_string());
226 }
227
228 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
229 self.started_process_ids
230 .lock()
231 .expect("started process ids lock")
232 .contains(process_id)
233 }
234
235 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
236 self.process_originator
237 .clone()
238 .map(|originator| crate::ProcessSpawnProvenance {
239 originator,
240 wake_target: self.process_wake_target.clone(),
241 })
242 }
243
244 pub(super) async fn attach_captured_process_execution_env(
245 &self,
246 registration: crate::ProcessRegistration,
247 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
248 if registration.env_ref.is_some() {
249 return Ok(registration);
250 }
251 match registration.input.as_ref() {
252 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
253 let env_ref = self.captured_process_execution_env_ref().await?;
254 Ok(registration.with_execution_env_ref(Some(env_ref)))
255 }
256 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
257 Ok(registration)
258 }
259 }
260 }
261
262 async fn captured_process_execution_env_ref(
263 &self,
264 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
265 if let Some(env_ref) = self.process_env_ref.clone() {
266 return Ok(env_ref);
267 }
268 crate::persist_process_execution_env(
269 self.lashlang_artifact_store.as_ref(),
270 &self.execution_env_spec,
271 )
272 .await
273 }
274
275 pub(crate) fn with_lashlang_execution_trace(
276 mut self,
277 sink: Option<Arc<dyn lash_trace::TraceSink>>,
278 context: lash_trace::TraceContext,
279 ) -> Self {
280 self.lashlang_execution_sink = sink;
281 self.lashlang_execution_context = context;
282 self
283 }
284
285 pub(crate) fn with_turn_phase_probe(
286 mut self,
287 probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
288 ) -> Self {
289 self.turn_phase_probe = probe;
290 self
291 }
292
293 #[doc(hidden)]
294 pub fn named_phase(&self, phase: &'static str) -> crate::runtime::RuntimeNamedPhase {
295 crate::runtime::RuntimeNamedPhase::begin(self.turn_phase_probe.clone(), phase)
296 }
297
298 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
299 self.parent_invocation.as_ref()
300 }
301
302 pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
303 self.lashlang_execution_sink.clone()
304 }
305
306 pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
307 &self.lashlang_execution_context
308 }
309
310 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
311 self.cancellation_token = Some(cancellation_token);
312 self
313 }
314
315 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
316 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
317 }
318
319 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
320 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
321 }
322
323 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
324 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
325 }
326
327 pub fn resolve_lashlang_host_operation(
328 &self,
329 receiver: &lashlang::ResourceHandle,
330 operation: &str,
331 ) -> Result<String, String> {
332 self.lashlang_host_environment
333 .resources
334 .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
335 .map(|binding| binding.host_operation.clone())
336 .ok_or_else(|| {
337 format!(
338 "module `{}` of type `{}` does not expose operation `{operation}`",
339 receiver.alias, receiver.resource_type
340 )
341 })
342 }
343
344 pub async fn prepare_lashlang_process_start(
345 &self,
346 start: lashlang::ProcessStart,
347 ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
348 let _phase = self.named_phase("rlm_process.prepare_start");
349 let display_name = Some(start.process_name.clone());
350 let artifact = self
351 .lashlang_artifact_store
352 .get_module_artifact(&start.module_ref)
353 .await
354 .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
355 .ok_or_else(|| {
356 format!(
357 "missing lashlang module artifact `{}` for process `{}`",
358 start.module_ref, start.process_name
359 )
360 })?;
361 if artifact.host_requirements_ref != start.host_requirements_ref {
362 return Err(format!(
363 "lashlang module artifact `{}` host requirements mismatch: process requested {}, artifact has {}",
364 start.module_ref, start.host_requirements_ref, artifact.host_requirements_ref
365 ));
366 }
367 if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
368 return Err(format!(
369 "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
370 start.module_ref, start.process_name, start.process_ref
371 ));
372 }
373 let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
374 .map_err(|err| format!("failed to serialize process args: {err}"))?
375 {
376 serde_json::Value::Object(map) => map,
377 _ => return Err("process args must serialize as a record".to_string()),
378 };
379 let signal_event_types = artifact
380 .canonical_ir
381 .process(&start.process_name)
382 .map(crate::lashlang_process_signal_event_types)
383 .unwrap_or_default();
384 let process_id = format!("process:{}", uuid::Uuid::new_v4());
385 let registration = crate::ProcessRegistration::session_start_draft(
386 process_id,
387 crate::ProcessInput::LashlangProcess {
388 module_ref: start.module_ref,
389 process_ref: start.process_ref,
390 host_requirements_ref: start.host_requirements_ref,
391 process_name: start.process_name,
392 args,
393 },
394 )
395 .with_extra_event_types(
396 crate::lashlang_process_event_types()
397 .into_iter()
398 .chain(signal_event_types),
399 );
400 Ok((registration, display_name))
401 }
402
403 pub fn lashlang_host_environment(&self) -> &lashlang::LashlangHostEnvironment {
404 &self.lashlang_host_environment
405 }
406
407 pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
408 self.lashlang_abilities
409 }
410
411 pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
412 self.lashlang_language_features
413 }
414
415 pub fn link_lashlang_module(
416 &self,
417 program: lashlang::Program,
418 ) -> Result<lashlang::LinkedModule, String> {
419 lashlang::LinkedModule::link(program, self.lashlang_host_environment())
420 .map_err(|err| err.to_string())
421 }
422
423 pub async fn perform_lashlang_trigger_operation(
424 &self,
425 operation: &str,
426 payload: serde_json::Value,
427 ) -> Result<serde_json::Value, String> {
428 match lashlang::TriggerHostOperation::from_host_operation(operation) {
429 Some(lashlang::TriggerHostOperation::Register) => self
430 .register_trigger_subscription(payload)
431 .await
432 .map_err(|err| err.to_string()),
433 Some(lashlang::TriggerHostOperation::List) => self
434 .list_trigger_subscriptions(payload)
435 .await
436 .map_err(|err| err.to_string()),
437 Some(lashlang::TriggerHostOperation::Cancel) => self
438 .cancel_trigger_subscription(payload)
439 .await
440 .map_err(|err| err.to_string()),
441 None => Err(format!("unknown trigger operation `{operation}`")),
442 }
443 }
444
445 async fn register_trigger_subscription(
446 &self,
447 payload: serde_json::Value,
448 ) -> Result<serde_json::Value, crate::PluginError> {
449 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
450 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
451 })?;
452 let request = lashlang::TriggerRegistrationRequest::decode(&payload)
453 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
454 let source_type = request.source.source_type.clone();
455 let source_value = request.source.value.clone();
456 let source = request.source.to_json();
457 let event_type = lashlang::event_type_for_source(
458 &self.dispatch.plugins.lashlang_resources(),
459 &source_type,
460 )
461 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
462 let validation = crate::plugin::validate_target_process(
463 &request.target,
464 &event_type,
465 &request.inputs,
466 self.lashlang_artifact_store.as_ref(),
467 )
468 .await?;
469 let store = router.store();
470 let source_key = store
471 .source_key_for_subscription(&source_type, &source_value)
472 .await?;
473 let env_ref = match self.process_env_ref.clone() {
474 Some(env_ref) => env_ref,
475 None => {
476 crate::persist_process_execution_env(
477 self.lashlang_artifact_store.as_ref(),
478 &self.execution_env_spec,
479 )
480 .await?
481 }
482 };
483 let registrant = self.process_originator.clone().unwrap_or_else(|| {
484 crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
485 });
486 let wake_target = self
487 .process_wake_target
488 .clone()
489 .or_else(|| match ®istrant {
490 crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
491 crate::ProcessOriginator::Host => None,
492 });
493 let record = store
494 .register_subscription(crate::TriggerSubscriptionDraft {
495 registrant,
496 env_ref,
497 wake_target,
498 name: request.name,
499 source_type,
500 source_key,
501 source,
502 event_ty: validation.event_ty,
503 module_ref: request.target.module_ref,
504 host_requirements_ref: request.target.host_requirements_ref,
505 process_ref: request.target.process_ref,
506 process_name: request.target.process_name,
507 input_template: validation.inputs,
508 })
509 .await?;
510 Ok(crate::plugin::trigger_handle_json(&record.handle))
511 }
512
513 async fn list_trigger_subscriptions(
514 &self,
515 payload: serde_json::Value,
516 ) -> Result<serde_json::Value, crate::PluginError> {
517 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
518 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
519 })?;
520 let request = lashlang::TriggerListRequest::decode(&payload)
521 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
522 let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
523 filter.target = request.target;
524 filter.name = request.name;
525 filter.source_type = request.source_type;
526 filter.enabled = request.enabled;
527 let registrations = router
528 .store()
529 .list_subscriptions(filter)
530 .await?
531 .iter()
532 .map(crate::TriggerRegistration::from)
533 .collect::<Vec<_>>();
534 serde_json::to_value(registrations).map_err(|err| {
535 crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
536 })
537 }
538
539 async fn cancel_trigger_subscription(
540 &self,
541 payload: serde_json::Value,
542 ) -> Result<serde_json::Value, crate::PluginError> {
543 let router = self.dispatch.trigger_router.as_ref().ok_or_else(|| {
544 crate::PluginError::Session("trigger store is unavailable in this runtime".to_string())
545 })?;
546 let request = lashlang::TriggerCancelRequest::decode(&payload)
547 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
548 let changed = router
549 .store()
550 .cancel_subscription(&self.session_id, &request.handle)
551 .await?;
552 Ok(serde_json::json!(changed))
553 }
554
555 pub fn tool_argument_projection_policy(
556 &self,
557 name: &str,
558 ) -> crate::ToolArgumentProjectionPolicy {
559 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
560 }
561
562 pub async fn start_lashlang_process(
563 &self,
564 registration: crate::ProcessRegistration,
565 label: Option<String>,
566 ) -> crate::ToolInvocationReply {
567 let _phase = self.named_phase("rlm_process.start");
568 let registration = match self
569 .attach_captured_process_execution_env(registration)
570 .await
571 {
572 Ok(registration) => registration,
573 Err(err) => {
574 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
575 }
576 };
577 let process_id = registration.id.clone();
578 let mut options = crate::ProcessStartOptions::new()
579 .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
580 if let Some(spawn) = self.process_spawn_provenance() {
581 options = options.with_spawn_provenance(spawn);
582 }
583 match self
584 .dispatch
585 .processes
586 .start(
587 &self.session_id,
588 registration,
589 options,
590 self.process_scope(self.parent_invocation.clone()),
591 )
592 .await
593 {
594 Ok(_) => {
595 self.record_started_process(&process_id);
596 crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
597 &process_id,
598 ))
599 }
600 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
601 }
602 }
603
604 pub async fn sleep_lashlang(
605 &self,
606 scope: &str,
607 sequence: u64,
608 duration_ms: u64,
609 ) -> Result<(), crate::RuntimeEffectControllerError> {
610 let cancellation = self.cancellation_token.clone().unwrap_or_default();
611 let invocation = crate::runtime::causal::lashlang_sleep_invocation(
612 &self.session_id,
613 self.parent_invocation.as_ref(),
614 scope,
615 sequence,
616 );
617 let outcome = self
618 .dispatch
619 .effect_controller
620 .controller()
621 .execute_effect(
622 crate::RuntimeEffectEnvelope::new(
623 invocation,
624 crate::RuntimeEffectCommand::Sleep { duration_ms },
625 ),
626 crate::RuntimeEffectLocalExecutor::sleep(cancellation),
627 )
628 .await?;
629 match outcome {
630 crate::RuntimeEffectOutcome::Sleep => Ok(()),
631 other => Err(crate::RuntimeEffectControllerError::new(
632 "runtime_effect_wrong_outcome",
633 format!("expected sleep outcome, got {}", other.kind().as_str()),
634 )),
635 }
636 }
637
638 pub async fn await_process_event_lashlang(
639 &self,
640 _registry: Arc<dyn crate::ProcessRegistry>,
641 process_id: &str,
642 signal_name: &str,
643 _event_type: &str,
644 event_ordinal: u64,
645 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
646 let cancellation = self.cancellation_token.clone().unwrap_or_default();
647 let key = self
648 .dispatch
649 .effect_controller
650 .controller()
651 .await_event_key(
652 &crate::ExecutionScope::process(process_id),
653 crate::AwaitEventWaitIdentity::process_signal(
654 process_id,
655 signal_name,
656 event_ordinal,
657 ),
658 )
659 .await?;
660 let invocation = crate::runtime::causal::lashlang_await_event_invocation(
661 &self.session_id,
662 self.parent_invocation.as_ref(),
663 process_id,
664 signal_name,
665 event_ordinal,
666 );
667 let outcome = self
668 .dispatch
669 .effect_controller
670 .controller()
671 .execute_effect(
672 crate::RuntimeEffectEnvelope::new(
673 invocation,
674 crate::RuntimeEffectCommand::AwaitEvent { key },
675 ),
676 crate::RuntimeEffectLocalExecutor::await_event(cancellation, None),
677 )
678 .await?;
679 match outcome.into_await_event()? {
680 crate::Resolution::Ok(value) => Ok(value),
681 crate::Resolution::Err(err) => Err(crate::RuntimeEffectControllerError::new(
682 err.code,
683 err.message,
684 )),
685 crate::Resolution::Timeout => Err(crate::RuntimeEffectControllerError::new(
686 "process_signal_wait_timeout",
687 "process signal wait timed out",
688 )),
689 crate::Resolution::Cancelled => Err(crate::RuntimeEffectControllerError::new(
690 "process_signal_wait_cancelled",
691 "process signal wait was cancelled",
692 )),
693 }
694 }
695
696 pub async fn signal_lashlang_process(
697 &self,
698 registry: Arc<dyn crate::ProcessRegistry>,
699 process_id: &str,
700 signal_name: &str,
701 signal_id: String,
702 payload: serde_json::Value,
703 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
704 let event_type = crate::process_signal_event_type(signal_name)?;
705 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
706 let signal_payload = payload.clone();
707 let command = crate::ProcessCommand::Signal {
708 process_id: process_id.to_string(),
709 signal_name: signal_name.to_string(),
710 signal_id,
711 request: crate::ProcessEventAppendRequest::new(event_type.clone(), payload)
712 .with_replay_key(replay_key),
713 };
714 let effect_id = command.effect_id();
715 let invocation = crate::runtime::causal::process_effect_invocation(
716 &self.session_id,
717 self.parent_invocation.clone(),
718 &effect_id,
719 );
720 let outcome = self
721 .dispatch
722 .effect_controller
723 .controller()
724 .execute_effect(
725 crate::RuntimeEffectEnvelope::new(
726 invocation,
727 crate::RuntimeEffectCommand::process(command),
728 ),
729 crate::RuntimeEffectLocalExecutor::processes(Arc::clone(®istry)),
730 )
731 .await?;
732 match outcome.into_process()? {
733 crate::ProcessEffectOutcome::Signal { event } => {
734 let waiting_ordinal =
735 registry
736 .get_process(process_id)
737 .await
738 .and_then(|record| match record.wait {
739 Some(crate::WaitState {
740 kind:
741 crate::WaitKind::Signal {
742 name,
743 event_type: wait_event_type,
744 ordinal,
745 ..
746 },
747 ..
748 }) if name == signal_name && wait_event_type == event_type => {
749 Some(ordinal)
750 }
751 _ => None,
752 });
753 let ordinal = match waiting_ordinal {
754 Some(ordinal) => ordinal,
755 None => {
756 registry
757 .count_events_through(process_id, &event_type, event.sequence)
758 .await?
759 }
760 };
761 if ordinal > 0 {
762 let key = self
763 .dispatch
764 .effect_controller
765 .controller()
766 .await_event_key(
767 &crate::ExecutionScope::process(process_id),
768 crate::AwaitEventWaitIdentity::process_signal(
769 process_id,
770 signal_name,
771 ordinal,
772 ),
773 )
774 .await?;
775 let _ = self
776 .dispatch
777 .effect_controller
778 .controller()
779 .resolve_await_event(&key, crate::Resolution::Ok(signal_payload))
780 .await?;
781 }
782 Ok(event)
783 }
784 other => Err(crate::RuntimeEffectControllerError::new(
785 "runtime_effect_wrong_outcome",
786 format!("expected signal outcome, got {other:?}"),
787 )),
788 }
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::tool_dispatch::ToolDispatchContext;
796 use crate::{ToolCall, ToolProvider, ToolResult};
797
798 struct NoopTools;
799
800 #[async_trait::async_trait]
801 impl ToolProvider for NoopTools {
802 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
803 Vec::new()
804 }
805
806 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
807 None
808 }
809
810 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
811 ToolResult::err_fmt("not used")
812 }
813 }
814
815 #[test]
816 fn tool_argument_projection_policy_resolves_from_active_catalog_and_defaults_unknown() {
817 let tool = crate::ToolDefinition::raw(
818 "tool:seedy",
819 "seedy",
820 "Seed-aware",
821 crate::ToolDefinition::default_input_schema(),
822 serde_json::json!({ "type": "string" }),
823 )
824 .with_argument_projection(
825 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
826 );
827 let plugins = crate::plugin::PluginHost::empty()
828 .build_session("session", None)
829 .expect("plugin session");
830 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
831 let dispatch = Arc::new(ToolDispatchContext {
832 plugins,
833 tools: Arc::new(NoopTools),
834 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
835 vec![tool.manifest()],
836 std::collections::BTreeMap::new(),
837 )),
838 sessions: Arc::new(crate::testing::MockSessionManager::default()),
839 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
840 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
841 processes: Arc::new(crate::UnavailableProcessService),
842 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
843 trigger_router: None,
844 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
845 crate::InlineRuntimeEffectController,
846 )),
847 direct_completions: crate::DirectCompletionClient::unavailable(
848 "direct completions are unavailable in this test context",
849 ),
850 parent_invocation: None,
851 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
852 crate::PluginOptions::default(),
853 crate::SessionPolicy::default(),
854 ),
855 session_id: "session".to_string(),
856 agent_frame_id: String::new(),
857 event_tx,
858 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
859 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
860 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
861 turn_context: crate::TurnContext::default(),
862 });
863 let ctx = RuntimeExecutionContext::new(
864 "session".to_string(),
865 dispatch,
866 Default::default(),
867 Default::default(),
868 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
869 Arc::new(crate::InMemoryAttachmentStore::new()),
870 Arc::new(crate::ChronologicalProjection::default()),
871 None,
872 crate::TurnContext::default(),
873 );
874
875 assert_eq!(
876 ctx.tool_argument_projection_policy("seedy"),
877 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
878 );
879 assert_eq!(
880 ctx.tool_argument_projection_policy("missing"),
881 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
882 );
883 }
884
885 #[tokio::test]
886 async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
887 let tool = crate::ToolDefinition::raw(
888 "tool:alpha",
889 "alpha",
890 "Alpha tool.",
891 crate::ToolDefinition::default_input_schema(),
892 serde_json::json!({ "type": "object", "additionalProperties": true }),
893 );
894 let plugins = crate::plugin::PluginHost::empty()
895 .build_session("session", None)
896 .expect("plugin session");
897 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
898 let dispatch = Arc::new(ToolDispatchContext {
899 plugins,
900 tools: Arc::new(NoopTools),
901 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
902 vec![tool.manifest()],
903 std::collections::BTreeMap::new(),
904 )),
905 sessions: Arc::new(crate::testing::MockSessionManager::default()),
906 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
907 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
908 processes: Arc::new(crate::UnavailableProcessService),
909 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
910 trigger_router: None,
911 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
912 crate::InlineRuntimeEffectController,
913 )),
914 direct_completions: crate::DirectCompletionClient::unavailable(
915 "direct completions are unavailable in this test context",
916 ),
917 parent_invocation: None,
918 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
919 crate::PluginOptions::default(),
920 crate::SessionPolicy::default(),
921 ),
922 session_id: "session".to_string(),
923 agent_frame_id: String::new(),
924 event_tx,
925 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
926 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
927 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
928 turn_context: crate::TurnContext::default(),
929 });
930 let ctx = RuntimeExecutionContext::new(
931 "session".to_string(),
932 dispatch,
933 lashlang::LashlangAbilities::default().with_processes(),
934 Default::default(),
935 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
936 Arc::new(crate::InMemoryAttachmentStore::new()),
937 Arc::new(crate::ChronologicalProjection::default()),
938 None,
939 crate::TurnContext::default(),
940 );
941 let mut input = lashlang::Record::new();
942 input.insert("root".to_string(), lashlang::Value::String(".".into()));
943 let linked = ctx
944 .link_lashlang_module(
945 lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
946 )
947 .expect("link process module");
948 ctx.put_lashlang_module_artifact(&linked.artifact)
949 .await
950 .expect("store module artifact");
951 let process_ref = linked
952 .artifact
953 .process_ref("scan")
954 .expect("scan process ref")
955 .clone();
956 let (registration, label) = ctx
957 .prepare_lashlang_process_start(lashlang::ProcessStart {
958 module_ref: linked.module_ref.clone(),
959 process_ref,
960 host_requirements_ref: linked.host_requirements_ref.clone(),
961 process_name: "scan".to_string(),
962 args: input,
963 })
964 .await
965 .expect("process start should prepare");
966
967 assert_eq!(label.as_deref(), Some("scan"));
968 assert!(
969 registration
970 .event_types
971 .iter()
972 .any(|event_type| event_type.name == "process.wake")
973 );
974 let crate::ProcessInput::LashlangProcess {
975 args, process_name, ..
976 } = registration.input.as_ref()
977 else {
978 panic!("expected lashlang process input");
979 };
980 assert_eq!(process_name, "scan");
981 assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
982 }
983
984 #[test]
985 fn lashlang_host_environment_reflects_host_abilities() {
986 let tool = crate::ToolDefinition::raw(
987 "tool:alpha",
988 "alpha",
989 "Alpha tool.",
990 crate::ToolDefinition::default_input_schema(),
991 serde_json::json!({ "type": "object", "additionalProperties": true }),
992 );
993 let plugins = crate::plugin::PluginHost::empty()
994 .build_session("session", None)
995 .expect("plugin session");
996 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
997 let dispatch = Arc::new(ToolDispatchContext {
998 plugins,
999 tools: Arc::new(NoopTools),
1000 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1001 vec![tool.manifest()],
1002 std::collections::BTreeMap::new(),
1003 )),
1004 sessions: Arc::new(crate::testing::MockSessionManager::default()),
1005 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1006 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1007 processes: Arc::new(crate::UnavailableProcessService),
1008 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1009 trigger_router: None,
1010 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1011 crate::InlineRuntimeEffectController,
1012 )),
1013 direct_completions: crate::DirectCompletionClient::unavailable(
1014 "direct completions are unavailable in this test context",
1015 ),
1016 parent_invocation: None,
1017 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1018 crate::PluginOptions::default(),
1019 crate::SessionPolicy::default(),
1020 ),
1021 session_id: "session".to_string(),
1022 agent_frame_id: String::new(),
1023 event_tx,
1024 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1025 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1026 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1027 turn_context: crate::TurnContext::default(),
1028 });
1029 let ctx = RuntimeExecutionContext::new(
1030 "session".to_string(),
1031 dispatch,
1032 lashlang::LashlangAbilities::default()
1033 .with_sleep()
1034 .with_processes()
1035 .with_process_signals(),
1036 Default::default(),
1037 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1038 Arc::new(crate::InMemoryAttachmentStore::new()),
1039 Arc::new(crate::ChronologicalProjection::default()),
1040 None,
1041 crate::TurnContext::default(),
1042 );
1043
1044 let environment = ctx.lashlang_host_environment();
1045
1046 assert!(std::ptr::eq(environment, ctx.lashlang_host_environment()));
1047 assert!(environment.abilities.processes);
1048 assert!(environment.abilities.sleep);
1049 assert!(environment.abilities.process_signals);
1050 assert!(!environment.abilities.triggers);
1051 assert!(
1052 environment
1053 .resources
1054 .resolve_operation("Tools", "alpha")
1055 .is_some()
1056 );
1057 }
1058
1059 #[test]
1060 fn lashlang_host_environment_reflects_host_resource_contributions() {
1061 let mut resources = lashlang::LashlangHostCatalog::new();
1062 resources
1063 .add_trigger_source_constructor(
1064 ["clock", "Alarm"],
1065 lashlang::TypeExpr::Object(vec![lashlang::TypeField {
1066 name: "at".into(),
1067 ty: lashlang::TypeExpr::Str,
1068 optional: false,
1069 }]),
1070 lashlang::NamedDataType::object(
1071 "clock.Tick",
1072 vec![lashlang::TypeField {
1073 name: "fired_at".into(),
1074 ty: lashlang::TypeExpr::Str,
1075 optional: false,
1076 }],
1077 )
1078 .expect("valid clock tick type"),
1079 )
1080 .expect("valid clock trigger source");
1081 let plugin_host = crate::plugin::PluginHost::empty();
1082 let mut merged_resources = plugin_host.lashlang_resources();
1083 merged_resources.extend(resources);
1084 let plugins = plugin_host
1085 .with_lashlang_resources(merged_resources)
1086 .build_session("session", None)
1087 .expect("plugin session");
1088 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1089 let dispatch = Arc::new(ToolDispatchContext {
1090 plugins,
1091 tools: Arc::new(NoopTools),
1092 tool_catalog: Arc::new(crate::ToolCatalog::from_tools(
1093 Vec::new(),
1094 std::collections::BTreeMap::new(),
1095 )),
1096 sessions: Arc::new(crate::testing::MockSessionManager::default()),
1097 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1098 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1099 processes: Arc::new(crate::UnavailableProcessService),
1100 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1101 trigger_router: None,
1102 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1103 crate::InlineRuntimeEffectController,
1104 )),
1105 direct_completions: crate::DirectCompletionClient::unavailable(
1106 "direct completions are unavailable in this test context",
1107 ),
1108 parent_invocation: None,
1109 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1110 crate::PluginOptions::default(),
1111 crate::SessionPolicy::default(),
1112 ),
1113 session_id: "session".to_string(),
1114 agent_frame_id: String::new(),
1115 event_tx,
1116 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1117 trigger_outcomes: crate::tool_dispatch::ToolTriggerOutcomeBuffer::default(),
1118 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1119 turn_context: crate::TurnContext::default(),
1120 });
1121 let ctx = RuntimeExecutionContext::new(
1122 "session".to_string(),
1123 dispatch,
1124 lashlang::LashlangAbilities::default()
1125 .with_processes()
1126 .with_triggers(),
1127 Default::default(),
1128 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1129 Arc::new(crate::InMemoryAttachmentStore::new()),
1130 Arc::new(crate::ChronologicalProjection::default()),
1131 None,
1132 crate::TurnContext::default(),
1133 );
1134
1135 let host_environment = ctx.lashlang_host_environment();
1136
1137 assert!(
1138 host_environment
1139 .resources
1140 .resolve_value_constructor(&["clock", "Alarm"])
1141 .is_some()
1142 );
1143 assert!(
1144 host_environment
1145 .resources
1146 .resolve_trigger_source("clock.Alarm")
1147 .is_some()
1148 );
1149 lashlang::LinkedModule::link(
1150 lashlang::parse(
1151 r#"
1152 process remember(tick: clock.Tick) {
1153 finish true
1154 }
1155
1156 source = clock.Alarm({ at: "08:00" })
1157 await triggers.register({
1158 source: source,
1159 target: remember,
1160 inputs: { tick: trigger.event }
1161 })?
1162 "#,
1163 )
1164 .expect("parse trigger registry module"),
1165 host_environment,
1166 )
1167 .expect("host resource contribution should be linkable");
1168 }
1169}