1use std::sync::Arc;
2
3use tokio::sync::mpsc::Sender;
4use tokio_util::sync::CancellationToken;
5
6use crate::tool_dispatch::ToolDispatchContext;
7use crate::{TurnActivity, TurnActivityId, TurnEvent};
8
9pub(crate) fn lashlang_surface_from_tool_surface(
10 surface: &crate::ToolSurface,
11 abilities: lashlang::LashlangAbilities,
12 language_features: lashlang::LashlangLanguageFeatures,
13 host_resources: lashlang::ResourceCatalog,
14) -> lashlang::LashlangSurface {
15 let mut resources = lashlang_resources_from_tool_surface(surface);
16 resources.extend(host_resources);
17 lashlang::LashlangSurface::new(resources, abilities).with_language_features(language_features)
18}
19
20pub(crate) fn lashlang_resources_from_tool_surface(
21 surface: &crate::ToolSurface,
22) -> lashlang::ResourceCatalog {
23 let mut catalog = lashlang::ResourceCatalog::new();
24 for entry in surface.tools.iter() {
25 if entry.availability.is_callable() {
26 let agent_surface = entry
27 .manifest
28 .agent_surface
29 .executable_for(&entry.manifest.name);
30 catalog.add_module_operation(
31 agent_surface.module_path.iter().map(String::as_str),
32 agent_surface.authority_type.clone(),
33 agent_surface.operation.clone(),
34 entry.manifest.name.clone(),
35 lashlang::TypeExpr::Any,
36 lashlang::TypeExpr::Any,
37 );
38 }
39 }
40 catalog
41}
42
43#[derive(Clone)]
44pub struct RuntimeExecutionContext<'run> {
45 pub(super) session_id: String,
46 pub(super) dispatch: Arc<ToolDispatchContext<'run>>,
47 lashlang_abilities: lashlang::LashlangAbilities,
48 lashlang_language_features: lashlang::LashlangLanguageFeatures,
49 lashlang_surface: lashlang::LashlangSurface,
50 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
51 attachment_store: Arc<dyn crate::AttachmentStore>,
52 chronological_projection: Arc<crate::ChronologicalProjection>,
53 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
54 turn_context: crate::TurnContext,
55 execution_env_spec: crate::ProcessExecutionEnvSpec,
56 process_originator: Option<crate::ProcessOriginator>,
57 process_env_ref: Option<crate::ProcessExecutionEnvRef>,
58 process_wake_target: Option<crate::SessionScope>,
59 pub(super) parent_invocation: Option<crate::RuntimeInvocation>,
60 lashlang_execution_sink: Option<Arc<dyn lash_trace::TraceSink>>,
61 lashlang_execution_context: lash_trace::TraceContext,
62 pub(super) turn_event_tx: Option<Sender<TurnActivity>>,
63 pub(super) cancellation_token: Option<CancellationToken>,
64 started_process_ids: Arc<std::sync::Mutex<std::collections::HashSet<String>>>,
69}
70
71impl<'run> RuntimeExecutionContext<'run> {
72 pub(crate) fn drain_tool_host_event_outcomes(
73 &self,
74 ) -> Result<Vec<crate::tool_dispatch::ToolHostEventEffectOutcome>, crate::PluginError> {
75 self.dispatch
76 .host_event_outcomes
77 .drain()
78 .map_err(crate::PluginError::Session)
79 }
80
81 pub(super) fn process_scope(
82 &self,
83 parent_invocation: Option<crate::RuntimeInvocation>,
84 ) -> crate::ProcessOpScope<'_> {
85 crate::ProcessOpScope::new(self.dispatch.effect_controller.scoped())
86 .with_parent_invocation(parent_invocation)
87 .with_agent_frame_id(Some(self.dispatch.agent_frame_id.clone()))
88 }
89
90 #[allow(
91 clippy::too_many_arguments,
92 reason = "code execution bridge carries explicit per-turn runtime dependencies"
93 )]
94 pub(crate) fn new(
95 session_id: String,
96 dispatch: Arc<ToolDispatchContext<'run>>,
97 lashlang_abilities: lashlang::LashlangAbilities,
98 lashlang_language_features: lashlang::LashlangLanguageFeatures,
99 lashlang_artifact_store: Arc<dyn lashlang::LashlangArtifactStore>,
100 attachment_store: Arc<dyn crate::AttachmentStore>,
101 chronological_projection: Arc<crate::ChronologicalProjection>,
102 protocol_extension: Option<crate::ProtocolTurnExtensionHandle>,
103 turn_context: crate::TurnContext,
104 ) -> Self {
105 let lashlang_surface = lashlang_surface_from_tool_surface(
106 &dispatch.surface,
107 lashlang_abilities,
108 lashlang_language_features,
109 dispatch.plugins.lashlang_resources(),
110 );
111 Self {
112 session_id,
113 dispatch,
114 lashlang_abilities,
115 lashlang_language_features,
116 lashlang_surface,
117 lashlang_artifact_store,
118 attachment_store,
119 chronological_projection,
120 protocol_extension,
121 turn_context,
122 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
123 crate::PluginOptions::default(),
124 crate::SessionPolicy::default(),
125 ),
126 process_originator: None,
127 started_process_ids: Arc::default(),
128 process_env_ref: None,
129 process_wake_target: None,
130 parent_invocation: None,
131 lashlang_execution_sink: None,
132 lashlang_execution_context: lash_trace::TraceContext::default(),
133 turn_event_tx: None,
134 cancellation_token: None,
135 }
136 }
137
138 pub fn session_id(&self) -> &str {
139 &self.session_id
140 }
141
142 pub fn attachment_store(&self) -> Arc<dyn crate::AttachmentStore> {
143 Arc::clone(&self.attachment_store)
144 }
145
146 pub async fn put_lashlang_module_artifact(
147 &self,
148 artifact: &lashlang::ModuleArtifact,
149 ) -> Result<(), String> {
150 self.lashlang_artifact_store
151 .put_module_artifact(artifact)
152 .await
153 .map_err(|err| err.to_string())
154 }
155
156 pub fn chronological_projection(&self) -> Arc<crate::ChronologicalProjection> {
157 Arc::clone(&self.chronological_projection)
158 }
159
160 pub fn protocol_extension<T: 'static>(&self) -> Option<&T> {
161 self.protocol_extension
162 .as_ref()
163 .and_then(|extension| extension.as_any().downcast_ref::<T>())
164 }
165
166 pub fn turn_context(&self) -> &crate::TurnContext {
167 &self.turn_context
168 }
169
170 pub(crate) fn session_graph_service(&self) -> &dyn crate::plugin::SessionGraphService {
171 self.dispatch.session_graph.as_ref()
172 }
173
174 pub(super) async fn emit_turn_activity(
175 &self,
176 correlation_id: TurnActivityId,
177 event: TurnEvent,
178 ) {
179 if let Some(tx) = &self.turn_event_tx {
180 let _ = tx.send(TurnActivity::new(correlation_id, event)).await;
181 }
182 }
183
184 pub(crate) fn with_turn_event_sender(mut self, turn_event_tx: Sender<TurnActivity>) -> Self {
185 self.turn_event_tx = Some(turn_event_tx);
186 self
187 }
188
189 pub(crate) fn with_parent_invocation(mut self, metadata: crate::RuntimeInvocation) -> Self {
190 self.parent_invocation = Some(metadata);
191 self
192 }
193
194 pub(crate) fn with_execution_env_spec(
195 mut self,
196 execution_env_spec: crate::ProcessExecutionEnvSpec,
197 ) -> Self {
198 self.execution_env_spec = execution_env_spec;
199 self
200 }
201
202 pub(crate) fn with_process_registration_context(
203 mut self,
204 registration: &crate::ProcessRegistration,
205 ) -> Self {
206 self.process_originator = Some(registration.provenance.originator.clone());
207 self.process_env_ref = registration.env_ref.clone();
208 self.process_wake_target = registration.wake_target.clone();
209 self
210 }
211
212 pub(super) fn record_started_process(&self, process_id: &str) {
216 self.started_process_ids
217 .lock()
218 .expect("started process ids lock")
219 .insert(process_id.to_string());
220 }
221
222 pub(super) fn is_run_local_process(&self, process_id: &str) -> bool {
223 self.started_process_ids
224 .lock()
225 .expect("started process ids lock")
226 .contains(process_id)
227 }
228
229 pub(crate) fn process_spawn_provenance(&self) -> Option<crate::ProcessSpawnProvenance> {
230 self.process_originator
231 .clone()
232 .map(|originator| crate::ProcessSpawnProvenance {
233 originator,
234 wake_target: self.process_wake_target.clone(),
235 })
236 }
237
238 pub(super) async fn attach_captured_process_execution_env(
239 &self,
240 registration: crate::ProcessRegistration,
241 ) -> Result<crate::ProcessRegistration, crate::PluginError> {
242 if registration.env_ref.is_some() {
243 return Ok(registration);
244 }
245 match registration.input.as_ref() {
246 crate::ProcessInput::ToolCall { .. } | crate::ProcessInput::LashlangProcess { .. } => {
247 let env_ref = self.captured_process_execution_env_ref().await?;
248 Ok(registration.with_execution_env_ref(Some(env_ref)))
249 }
250 crate::ProcessInput::External { .. } | crate::ProcessInput::SessionTurn { .. } => {
251 Ok(registration)
252 }
253 }
254 }
255
256 async fn captured_process_execution_env_ref(
257 &self,
258 ) -> Result<crate::ProcessExecutionEnvRef, crate::PluginError> {
259 if let Some(env_ref) = self.process_env_ref.clone() {
260 return Ok(env_ref);
261 }
262 crate::persist_process_execution_env(
263 self.lashlang_artifact_store.as_ref(),
264 &self.execution_env_spec,
265 )
266 .await
267 }
268
269 pub(crate) fn with_lashlang_execution_trace(
270 mut self,
271 sink: Option<Arc<dyn lash_trace::TraceSink>>,
272 context: lash_trace::TraceContext,
273 ) -> Self {
274 self.lashlang_execution_sink = sink;
275 self.lashlang_execution_context = context;
276 self
277 }
278
279 pub fn parent_invocation(&self) -> Option<&crate::RuntimeInvocation> {
280 self.parent_invocation.as_ref()
281 }
282
283 pub fn lashlang_execution_sink(&self) -> Option<Arc<dyn lash_trace::TraceSink>> {
284 self.lashlang_execution_sink.clone()
285 }
286
287 pub fn lashlang_execution_context(&self) -> &lash_trace::TraceContext {
288 &self.lashlang_execution_context
289 }
290
291 pub(crate) fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
292 self.cancellation_token = Some(cancellation_token);
293 self
294 }
295
296 pub(crate) fn tool_scheduling(&self, name: &str) -> crate::ToolScheduling {
297 crate::tool_dispatch::resolve_tool_scheduling(&self.dispatch, name)
298 }
299
300 pub fn callable_tool_manifest(&self, name: &str) -> Option<crate::ToolManifest> {
301 crate::tool_dispatch::resolve_callable_manifest(&self.dispatch, name)
302 }
303
304 pub fn callable_tool_manifest_by_id(&self, id: &crate::ToolId) -> Option<crate::ToolManifest> {
305 crate::tool_dispatch::resolve_callable_manifest_by_id(&self.dispatch, id)
306 }
307
308 pub fn resolve_lashlang_host_operation(
309 &self,
310 receiver: &lashlang::ResourceHandle,
311 operation: &str,
312 ) -> Result<String, String> {
313 self.lashlang_surface
314 .resources
315 .resolve_module_operation(&receiver.resource_type, &receiver.alias, operation)
316 .map(|binding| binding.host_operation.clone())
317 .ok_or_else(|| {
318 format!(
319 "module `{}` of type `{}` does not expose operation `{operation}`",
320 receiver.alias, receiver.resource_type
321 )
322 })
323 }
324
325 pub async fn prepare_lashlang_process_start(
326 &self,
327 start: lashlang::ProcessStart,
328 ) -> Result<(crate::ProcessRegistration, Option<String>), String> {
329 let display_name = Some(start.process_name.clone());
330 let artifact = self
331 .lashlang_artifact_store
332 .get_module_artifact(&start.module_ref)
333 .await
334 .map_err(|err| format!("failed to load lashlang module artifact: {err}"))?
335 .ok_or_else(|| {
336 format!(
337 "missing lashlang module artifact `{}` for process `{}`",
338 start.module_ref, start.process_name
339 )
340 })?;
341 if artifact.required_surface_ref != start.required_surface_ref {
342 return Err(format!(
343 "lashlang module artifact `{}` required surface mismatch: process requested {}, artifact has {}",
344 start.module_ref, start.required_surface_ref, artifact.required_surface_ref
345 ));
346 }
347 if artifact.process_ref(&start.process_name) != Some(&start.process_ref) {
348 return Err(format!(
349 "lashlang module artifact `{}` does not export process `{}` as requested ref {:?}",
350 start.module_ref, start.process_name, start.process_ref
351 ));
352 }
353 let args = match serde_json::to_value(lashlang::Value::Record(Arc::new(start.args)))
354 .map_err(|err| format!("failed to serialize process args: {err}"))?
355 {
356 serde_json::Value::Object(map) => map,
357 _ => return Err("process args must serialize as a record".to_string()),
358 };
359 let signal_event_types = artifact
360 .canonical_ir
361 .process(&start.process_name)
362 .map(crate::lashlang_process_signal_event_types)
363 .unwrap_or_default();
364 let process_id = format!("process:{}", uuid::Uuid::new_v4());
365 let registration = crate::ProcessRegistration::session_start_draft(
366 process_id,
367 crate::ProcessInput::LashlangProcess {
368 module_ref: start.module_ref,
369 process_ref: start.process_ref,
370 required_surface_ref: start.required_surface_ref,
371 process_name: start.process_name,
372 args,
373 },
374 )
375 .with_extra_event_types(
376 crate::lashlang_process_event_types()
377 .into_iter()
378 .chain(signal_event_types),
379 );
380 Ok((registration, display_name))
381 }
382
383 pub fn lashlang_surface(&self) -> &lashlang::LashlangSurface {
384 &self.lashlang_surface
385 }
386
387 pub fn lashlang_abilities(&self) -> lashlang::LashlangAbilities {
388 self.lashlang_abilities
389 }
390
391 pub fn lashlang_language_features(&self) -> lashlang::LashlangLanguageFeatures {
392 self.lashlang_language_features
393 }
394
395 pub fn link_lashlang_module(
396 &self,
397 program: lashlang::Program,
398 ) -> Result<lashlang::LinkedModule, String> {
399 lashlang::LinkedModule::link(program, self.lashlang_surface())
400 .map_err(|err| err.to_string())
401 }
402
403 pub async fn perform_lashlang_trigger_operation(
404 &self,
405 operation: &str,
406 payload: serde_json::Value,
407 ) -> Result<serde_json::Value, String> {
408 match lashlang::TriggerHostOperation::from_host_operation(operation) {
409 Some(lashlang::TriggerHostOperation::Register) => self
410 .register_host_event_subscription(payload)
411 .await
412 .map_err(|err| err.to_string()),
413 Some(lashlang::TriggerHostOperation::List) => self
414 .list_host_event_subscriptions(payload)
415 .await
416 .map_err(|err| err.to_string()),
417 Some(lashlang::TriggerHostOperation::Cancel) => self
418 .cancel_host_event_subscription(payload)
419 .await
420 .map_err(|err| err.to_string()),
421 None => Err(format!("unknown trigger operation `{operation}`")),
422 }
423 }
424
425 async fn register_host_event_subscription(
426 &self,
427 payload: serde_json::Value,
428 ) -> Result<serde_json::Value, crate::PluginError> {
429 let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
430 crate::PluginError::Session(
431 "host event store is unavailable in this runtime".to_string(),
432 )
433 })?;
434 let request = lashlang::TriggerRegistrationRequest::decode(&payload)
435 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
436 let source_type = request.source.source_type.clone();
437 let source_value = request.source.value.clone();
438 let source = request.source.to_json();
439 let event_type = lashlang::event_type_for_source(
440 &self.dispatch.plugins.lashlang_resources(),
441 &source_type,
442 )
443 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
444 let validation = crate::plugin::validate_target_process(
445 &request.target,
446 &event_type,
447 &request.inputs,
448 self.lashlang_artifact_store.as_ref(),
449 )
450 .await?;
451 let store = router.store();
452 let source_key = store
453 .source_key_for_subscription(&source_type, &source_value)
454 .await?;
455 let env_ref = match self.process_env_ref.clone() {
456 Some(env_ref) => env_ref,
457 None => {
458 crate::persist_process_execution_env(
459 self.lashlang_artifact_store.as_ref(),
460 &self.execution_env_spec,
461 )
462 .await?
463 }
464 };
465 let registrant = self.process_originator.clone().unwrap_or_else(|| {
466 crate::ProcessOriginator::session(crate::SessionScope::new(self.session_id.clone()))
467 });
468 let wake_target = self
469 .process_wake_target
470 .clone()
471 .or_else(|| match ®istrant {
472 crate::ProcessOriginator::Session { scope } => Some(scope.clone()),
473 crate::ProcessOriginator::Host => None,
474 });
475 let record = store
476 .register_subscription(crate::TriggerSubscriptionDraft {
477 registrant,
478 env_ref,
479 wake_target,
480 name: request.name,
481 source_type,
482 source_key,
483 source,
484 event_ty: validation.event_ty,
485 module_ref: request.target.module_ref,
486 required_surface_ref: request.target.required_surface_ref,
487 process_ref: request.target.process_ref,
488 process_name: request.target.process_name,
489 input_template: validation.inputs,
490 })
491 .await?;
492 Ok(crate::plugin::trigger_handle_json(&record.handle))
493 }
494
495 async fn list_host_event_subscriptions(
496 &self,
497 payload: serde_json::Value,
498 ) -> Result<serde_json::Value, crate::PluginError> {
499 let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
500 crate::PluginError::Session(
501 "host event store is unavailable in this runtime".to_string(),
502 )
503 })?;
504 let request = lashlang::TriggerListRequest::decode(&payload)
505 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
506 let mut filter = crate::TriggerSubscriptionFilter::for_session(&self.session_id);
507 filter.target = request.target;
508 filter.name = request.name;
509 filter.source_type = request.source_type;
510 filter.enabled = request.enabled;
511 let registrations = router
512 .store()
513 .list_subscriptions(filter)
514 .await?
515 .iter()
516 .map(crate::TriggerRegistration::from)
517 .collect::<Vec<_>>();
518 serde_json::to_value(registrations).map_err(|err| {
519 crate::PluginError::Session(format!("failed to encode trigger registrations: {err}"))
520 })
521 }
522
523 async fn cancel_host_event_subscription(
524 &self,
525 payload: serde_json::Value,
526 ) -> Result<serde_json::Value, crate::PluginError> {
527 let router = self.dispatch.host_event_router.as_ref().ok_or_else(|| {
528 crate::PluginError::Session(
529 "host event store is unavailable in this runtime".to_string(),
530 )
531 })?;
532 let request = lashlang::TriggerCancelRequest::decode(&payload)
533 .map_err(|err| crate::PluginError::Session(err.to_string()))?;
534 let changed = router
535 .store()
536 .cancel_subscription(&self.session_id, &request.handle)
537 .await?;
538 Ok(serde_json::json!(changed))
539 }
540
541 pub fn tool_argument_projection_policy(
542 &self,
543 name: &str,
544 ) -> crate::ToolArgumentProjectionPolicy {
545 crate::tool_dispatch::resolve_tool_argument_projection_policy(&self.dispatch, name)
546 }
547
548 pub async fn start_lashlang_process(
549 &self,
550 registration: crate::ProcessRegistration,
551 label: Option<String>,
552 ) -> crate::ToolInvocationReply {
553 let registration = match self
554 .attach_captured_process_execution_env(registration)
555 .await
556 {
557 Ok(registration) => registration,
558 Err(err) => {
559 return crate::ToolInvocationReply::error(serde_json::json!(err.to_string()));
560 }
561 };
562 let process_id = registration.id.clone();
563 let mut options = crate::ProcessStartOptions::new()
564 .with_descriptor(crate::ProcessHandleDescriptor::new(Some("lashlang"), label));
565 if let Some(spawn) = self.process_spawn_provenance() {
566 options = options.with_spawn_provenance(spawn);
567 }
568 match self
569 .dispatch
570 .processes
571 .start(
572 &self.session_id,
573 registration,
574 options,
575 self.process_scope(self.parent_invocation.clone()),
576 )
577 .await
578 {
579 Ok(_) => {
580 self.record_started_process(&process_id);
581 crate::ToolInvocationReply::success(crate::lashlang_bridge::process_handle_json(
582 &process_id,
583 ))
584 }
585 Err(err) => crate::ToolInvocationReply::error(serde_json::json!(err.to_string())),
586 }
587 }
588
589 pub async fn sleep_lashlang(
590 &self,
591 scope: &str,
592 sequence: u64,
593 duration_ms: u64,
594 ) -> Result<(), crate::RuntimeEffectControllerError> {
595 let cancellation = self.cancellation_token.clone().unwrap_or_default();
596 let invocation = crate::runtime::causal::lashlang_sleep_invocation(
597 &self.session_id,
598 self.parent_invocation.as_ref(),
599 scope,
600 sequence,
601 );
602 let outcome = self
603 .dispatch
604 .effect_controller
605 .controller()
606 .execute_effect(
607 crate::RuntimeEffectEnvelope::new(
608 invocation,
609 crate::RuntimeEffectCommand::Sleep { duration_ms },
610 ),
611 crate::RuntimeEffectLocalExecutor::sleep(cancellation),
612 )
613 .await?;
614 match outcome {
615 crate::RuntimeEffectOutcome::Sleep => Ok(()),
616 other => Err(crate::RuntimeEffectControllerError::new(
617 "runtime_effect_wrong_outcome",
618 format!("expected sleep outcome, got {}", other.kind().as_str()),
619 )),
620 }
621 }
622
623 pub async fn await_process_event_lashlang(
624 &self,
625 registry: Arc<dyn crate::ProcessRegistry>,
626 process_id: &str,
627 signal_name: &str,
628 event_type: &str,
629 event_ordinal: u64,
630 ) -> Result<serde_json::Value, crate::RuntimeEffectControllerError> {
631 let cancellation = self.cancellation_token.clone().unwrap_or_default();
632 let key = crate::process_signal_wait_key(process_id, signal_name, event_ordinal);
633 let invocation = crate::runtime::causal::lashlang_await_event_invocation(
634 &self.session_id,
635 self.parent_invocation.as_ref(),
636 process_id,
637 signal_name,
638 event_ordinal,
639 );
640 let outcome = self
641 .dispatch
642 .effect_controller
643 .controller()
644 .execute_effect(
645 crate::RuntimeEffectEnvelope::new(
646 invocation,
647 crate::RuntimeEffectCommand::AwaitEvent { key: key.clone() },
648 ),
649 crate::RuntimeEffectLocalExecutor::await_process_event(
650 key,
651 registry,
652 process_id.to_string(),
653 event_type.to_string(),
654 event_ordinal,
655 cancellation,
656 ),
657 )
658 .await?;
659 outcome.into_await_event()
660 }
661
662 pub async fn signal_lashlang_process(
663 &self,
664 registry: Arc<dyn crate::ProcessRegistry>,
665 process_id: &str,
666 signal_name: &str,
667 signal_id: String,
668 payload: serde_json::Value,
669 ) -> Result<crate::ProcessEvent, crate::RuntimeEffectControllerError> {
670 let event_type = crate::process_signal_event_type(signal_name)?;
671 let replay_key = format!("process:{process_id}:signal.{signal_name}:{signal_id}");
672 let command = crate::ProcessCommand::Signal {
673 process_id: process_id.to_string(),
674 signal_name: signal_name.to_string(),
675 signal_id,
676 request: crate::ProcessEventAppendRequest::new(event_type, payload)
677 .with_replay_key(replay_key),
678 };
679 let effect_id = command.effect_id();
680 let invocation = crate::runtime::causal::process_effect_invocation(
681 &self.session_id,
682 self.parent_invocation.clone(),
683 &effect_id,
684 );
685 let outcome = self
686 .dispatch
687 .effect_controller
688 .controller()
689 .execute_effect(
690 crate::RuntimeEffectEnvelope::new(
691 invocation,
692 crate::RuntimeEffectCommand::process(command),
693 ),
694 crate::RuntimeEffectLocalExecutor::process_control(registry),
695 )
696 .await?;
697 match outcome.into_process()? {
698 crate::ProcessEffectOutcome::Signal { event } => Ok(event),
699 other => Err(crate::RuntimeEffectControllerError::new(
700 "runtime_effect_wrong_outcome",
701 format!("expected signal outcome, got {other:?}"),
702 )),
703 }
704 }
705}
706
707#[cfg(test)]
708mod tests {
709 use super::*;
710 use crate::tool_dispatch::ToolDispatchContext;
711 use crate::{ToolCall, ToolProvider, ToolResult};
712
713 struct NoopTools;
714
715 #[async_trait::async_trait]
716 impl ToolProvider for NoopTools {
717 fn tool_manifests(&self) -> Vec<crate::ToolManifest> {
718 Vec::new()
719 }
720
721 fn resolve_contract(&self, _name: &str) -> Option<Arc<crate::ToolContract>> {
722 None
723 }
724
725 async fn execute(&self, _call: ToolCall<'_>) -> ToolResult {
726 ToolResult::err_fmt("not used")
727 }
728 }
729
730 #[test]
731 fn tool_argument_projection_policy_resolves_from_active_surface_and_defaults_unknown() {
732 let tool = crate::ToolDefinition::raw(
733 "tool:seedy",
734 "seedy",
735 "Seed-aware",
736 crate::ToolDefinition::default_input_schema(),
737 serde_json::json!({ "type": "string" }),
738 )
739 .with_argument_projection(
740 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed"),
741 );
742 let plugins = crate::plugin::PluginHost::empty()
743 .build_session("session", None)
744 .expect("plugin session");
745 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
746 let dispatch = Arc::new(ToolDispatchContext {
747 plugins,
748 tools: Arc::new(NoopTools),
749 surface: Arc::new(crate::ToolSurface::from_tools(
750 vec![tool.manifest()],
751 std::collections::BTreeMap::new(),
752 )),
753 sessions: Arc::new(crate::testing::MockSessionManager::default()),
754 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
755 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
756 processes: Arc::new(crate::UnavailableProcessService),
757 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
758 host_event_router: None,
759 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
760 crate::InlineRuntimeEffectController,
761 )),
762 direct_completions: crate::DirectCompletionClient::unavailable(
763 "direct completions are unavailable in this test context",
764 ),
765 parent_invocation: None,
766 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
767 crate::PluginOptions::default(),
768 crate::SessionPolicy::default(),
769 ),
770 session_id: "session".to_string(),
771 agent_frame_id: String::new(),
772 event_tx,
773 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
774 host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
775 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
776 turn_context: crate::TurnContext::default(),
777 });
778 let ctx = RuntimeExecutionContext::new(
779 "session".to_string(),
780 dispatch,
781 Default::default(),
782 Default::default(),
783 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
784 Arc::new(crate::InMemoryAttachmentStore::new()),
785 Arc::new(crate::ChronologicalProjection::default()),
786 None,
787 crate::TurnContext::default(),
788 );
789
790 assert_eq!(
791 ctx.tool_argument_projection_policy("seedy"),
792 crate::ToolArgumentProjectionPolicy::preserve_projected_refs_in_field("seed")
793 );
794 assert_eq!(
795 ctx.tool_argument_projection_policy("missing"),
796 crate::ToolArgumentProjectionPolicy::MaterializeProjectedValues
797 );
798 }
799
800 #[tokio::test]
801 async fn prepare_lashlang_process_start_captures_tool_ids_and_explicit_input() {
802 let tool = crate::ToolDefinition::raw(
803 "tool:alpha",
804 "alpha",
805 "Alpha tool.",
806 crate::ToolDefinition::default_input_schema(),
807 serde_json::json!({ "type": "object", "additionalProperties": true }),
808 );
809 let plugins = crate::plugin::PluginHost::empty()
810 .build_session("session", None)
811 .expect("plugin session");
812 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
813 let dispatch = Arc::new(ToolDispatchContext {
814 plugins,
815 tools: Arc::new(NoopTools),
816 surface: Arc::new(crate::ToolSurface::from_tools(
817 vec![tool.manifest()],
818 std::collections::BTreeMap::new(),
819 )),
820 sessions: Arc::new(crate::testing::MockSessionManager::default()),
821 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
822 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
823 processes: Arc::new(crate::UnavailableProcessService),
824 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
825 host_event_router: None,
826 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
827 crate::InlineRuntimeEffectController,
828 )),
829 direct_completions: crate::DirectCompletionClient::unavailable(
830 "direct completions are unavailable in this test context",
831 ),
832 parent_invocation: None,
833 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
834 crate::PluginOptions::default(),
835 crate::SessionPolicy::default(),
836 ),
837 session_id: "session".to_string(),
838 agent_frame_id: String::new(),
839 event_tx,
840 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
841 host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
842 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
843 turn_context: crate::TurnContext::default(),
844 });
845 let ctx = RuntimeExecutionContext::new(
846 "session".to_string(),
847 dispatch,
848 lashlang::LashlangAbilities::default().with_processes(),
849 Default::default(),
850 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
851 Arc::new(crate::InMemoryAttachmentStore::new()),
852 Arc::new(crate::ChronologicalProjection::default()),
853 None,
854 crate::TurnContext::default(),
855 );
856 let mut input = lashlang::Record::new();
857 input.insert("root".to_string(), lashlang::Value::String(".".into()));
858 let linked = ctx
859 .link_lashlang_module(
860 lashlang::parse("process scan(root: str) { finish root }").expect("process module"),
861 )
862 .expect("link process module");
863 ctx.put_lashlang_module_artifact(&linked.artifact)
864 .await
865 .expect("store module artifact");
866 let process_ref = linked
867 .artifact
868 .process_ref("scan")
869 .expect("scan process ref")
870 .clone();
871 let (registration, label) = ctx
872 .prepare_lashlang_process_start(lashlang::ProcessStart {
873 module_ref: linked.module_ref.clone(),
874 process_ref,
875 required_surface_ref: linked.required_surface_ref.clone(),
876 process_name: "scan".to_string(),
877 args: input,
878 })
879 .await
880 .expect("process start should prepare");
881
882 assert_eq!(label.as_deref(), Some("scan"));
883 assert!(
884 registration
885 .event_types
886 .iter()
887 .any(|event_type| event_type.name == "process.wake")
888 );
889 let crate::ProcessInput::LashlangProcess {
890 args, process_name, ..
891 } = registration.input.as_ref()
892 else {
893 panic!("expected lashlang process input");
894 };
895 assert_eq!(process_name, "scan");
896 assert_eq!(args.get("root"), Some(&serde_json::json!(".")));
897 }
898
899 #[test]
900 fn lashlang_surface_reflects_host_abilities() {
901 let tool = crate::ToolDefinition::raw(
902 "tool:alpha",
903 "alpha",
904 "Alpha tool.",
905 crate::ToolDefinition::default_input_schema(),
906 serde_json::json!({ "type": "object", "additionalProperties": true }),
907 );
908 let plugins = crate::plugin::PluginHost::empty()
909 .build_session("session", None)
910 .expect("plugin session");
911 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
912 let dispatch = Arc::new(ToolDispatchContext {
913 plugins,
914 tools: Arc::new(NoopTools),
915 surface: Arc::new(crate::ToolSurface::from_tools(
916 vec![tool.manifest()],
917 std::collections::BTreeMap::new(),
918 )),
919 sessions: Arc::new(crate::testing::MockSessionManager::default()),
920 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
921 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
922 processes: Arc::new(crate::UnavailableProcessService),
923 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
924 host_event_router: None,
925 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
926 crate::InlineRuntimeEffectController,
927 )),
928 direct_completions: crate::DirectCompletionClient::unavailable(
929 "direct completions are unavailable in this test context",
930 ),
931 parent_invocation: None,
932 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
933 crate::PluginOptions::default(),
934 crate::SessionPolicy::default(),
935 ),
936 session_id: "session".to_string(),
937 agent_frame_id: String::new(),
938 event_tx,
939 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
940 host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
941 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
942 turn_context: crate::TurnContext::default(),
943 });
944 let ctx = RuntimeExecutionContext::new(
945 "session".to_string(),
946 dispatch,
947 lashlang::LashlangAbilities::default()
948 .with_sleep()
949 .with_processes()
950 .with_process_signals(),
951 Default::default(),
952 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
953 Arc::new(crate::InMemoryAttachmentStore::new()),
954 Arc::new(crate::ChronologicalProjection::default()),
955 None,
956 crate::TurnContext::default(),
957 );
958
959 let surface = ctx.lashlang_surface();
960
961 assert!(std::ptr::eq(surface, ctx.lashlang_surface()));
962 assert!(surface.abilities.processes);
963 assert!(surface.abilities.sleep);
964 assert!(surface.abilities.process_signals);
965 assert!(!surface.abilities.triggers);
966 assert!(
967 surface
968 .resources
969 .resolve_operation("Tools", "alpha")
970 .is_some()
971 );
972 }
973
974 #[test]
975 fn lashlang_surface_reflects_host_resource_contributions() {
976 let mut resources = lashlang::ResourceCatalog::new();
977 resources
978 .add_trigger_source_constructor(
979 ["clock", "Alarm"],
980 lashlang::TypeExpr::Object(vec![lashlang::TypeField {
981 name: "at".into(),
982 ty: lashlang::TypeExpr::Str,
983 optional: false,
984 }]),
985 lashlang::NamedDataType::object(
986 "clock.Tick",
987 vec![lashlang::TypeField {
988 name: "fired_at".into(),
989 ty: lashlang::TypeExpr::Str,
990 optional: false,
991 }],
992 )
993 .expect("valid clock tick type"),
994 )
995 .expect("valid clock trigger source");
996 let plugin_host = crate::plugin::PluginHost::empty();
997 let mut merged_resources = plugin_host.lashlang_resources();
998 merged_resources.extend(resources);
999 let plugins = plugin_host
1000 .with_lashlang_resources(merged_resources)
1001 .build_session("session", None)
1002 .expect("plugin session");
1003 let (event_tx, _event_rx) = tokio::sync::mpsc::channel(1);
1004 let dispatch = Arc::new(ToolDispatchContext {
1005 plugins,
1006 tools: Arc::new(NoopTools),
1007 surface: Arc::new(crate::ToolSurface::from_tools(
1008 Vec::new(),
1009 std::collections::BTreeMap::new(),
1010 )),
1011 sessions: Arc::new(crate::testing::MockSessionManager::default()),
1012 session_lifecycle: Arc::new(crate::testing::MockSessionManager::default()),
1013 session_graph: Arc::new(crate::testing::MockSessionManager::default()),
1014 processes: Arc::new(crate::UnavailableProcessService),
1015 process_cancel_ability: Arc::new(crate::DefaultProcessCancelAbility),
1016 host_event_router: None,
1017 effect_controller: crate::runtime::RuntimeEffectControllerHandle::shared(Arc::new(
1018 crate::InlineRuntimeEffectController,
1019 )),
1020 direct_completions: crate::DirectCompletionClient::unavailable(
1021 "direct completions are unavailable in this test context",
1022 ),
1023 parent_invocation: None,
1024 execution_env_spec: crate::ProcessExecutionEnvSpec::new(
1025 crate::PluginOptions::default(),
1026 crate::SessionPolicy::default(),
1027 ),
1028 session_id: "session".to_string(),
1029 agent_frame_id: String::new(),
1030 event_tx,
1031 checkpoint_messages: crate::tool_dispatch::CheckpointMessageBuffer::default(),
1032 host_event_outcomes: crate::tool_dispatch::ToolHostEventOutcomeBuffer::default(),
1033 attachment_store: Arc::new(crate::InMemoryAttachmentStore::new()),
1034 turn_context: crate::TurnContext::default(),
1035 });
1036 let ctx = RuntimeExecutionContext::new(
1037 "session".to_string(),
1038 dispatch,
1039 lashlang::LashlangAbilities::default()
1040 .with_processes()
1041 .with_triggers(),
1042 Default::default(),
1043 Arc::new(lashlang::InMemoryLashlangArtifactStore::new()),
1044 Arc::new(crate::InMemoryAttachmentStore::new()),
1045 Arc::new(crate::ChronologicalProjection::default()),
1046 None,
1047 crate::TurnContext::default(),
1048 );
1049
1050 let surface = ctx.lashlang_surface();
1051
1052 assert!(
1053 surface
1054 .resources
1055 .resolve_value_constructor(&["clock", "Alarm"])
1056 .is_some()
1057 );
1058 assert!(
1059 surface
1060 .resources
1061 .resolve_trigger_source("clock.Alarm")
1062 .is_some()
1063 );
1064 lashlang::LinkedModule::link(
1065 lashlang::parse(
1066 r#"
1067 process remember(tick: clock.Tick) {
1068 finish true
1069 }
1070
1071 source = clock.Alarm({ at: "08:00" })
1072 await triggers.register({
1073 source: source,
1074 target: remember,
1075 inputs: { tick: trigger.event }
1076 })?
1077 "#,
1078 )
1079 .expect("parse trigger registry module"),
1080 surface,
1081 )
1082 .expect("host resource contribution should be linkable");
1083 }
1084}