1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6 ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7 ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Copy, Debug)]
15pub(crate) struct ProcessEnvValidationRuntime {
16 pub(crate) process_registry_available: bool,
17}
18
19pub(crate) async fn validate_lashlang_process_execution_env(
20 artifact_store: &dyn lashlang::LashlangArtifactStore,
21 plugin_host: &crate::PluginHost,
22 session_id: &str,
23 runtime: ProcessEnvValidationRuntime,
24 input: &super::model::ProcessInput,
25 env_spec: &super::model::ProcessExecutionEnvSpec,
26) -> Result<(), PluginError> {
27 let super::model::ProcessInput::LashlangProcess {
28 module_ref,
29 process_ref,
30 host_requirements_ref,
31 process_name,
32 ..
33 } = input
34 else {
35 return Ok(());
36 };
37
38 let artifact = artifact_store
39 .get_module_artifact(module_ref)
40 .await
41 .map_err(|err| {
42 PluginError::Session(format!(
43 "failed to load lashlang module artifact `{module_ref}` while validating process environment: {err}"
44 ))
45 })?
46 .ok_or_else(|| {
47 PluginError::Session(format!(
48 "missing lashlang module artifact `{module_ref}` while validating process environment"
49 ))
50 })?;
51 if artifact.host_requirements_ref != *host_requirements_ref {
52 return Err(PluginError::Session(format!(
53 "lashlang process `{process_name}` requested host requirements {}, artifact has {}",
54 host_requirements_ref, artifact.host_requirements_ref
55 )));
56 }
57 if artifact.process_ref(process_name) != Some(process_ref) {
58 return Err(PluginError::Session(format!(
59 "lashlang module `{module_ref}` does not export process `{process_name}` as requested ref {:?}",
60 process_ref
61 )));
62 }
63
64 let host = plugin_host.clone().with_lashlang_abilities(
65 crate::runtime::builder::lashlang_abilities_for_process_registry(
66 plugin_host.lashlang_abilities(),
67 runtime.process_registry_available,
68 ),
69 );
70 let plugins = host
71 .isolated_registry()
72 .build_session_with_parent(
73 session_id.to_string(),
74 None,
75 None,
76 crate::plugin::SessionAuthorityContext {
77 plugin_options: env_spec.plugin_options.clone(),
78 ..Default::default()
79 },
80 )
81 .map_err(|err| {
82 PluginError::Session(format!(
83 "failed to rebuild process environment plugin options for `{process_name}`: {err}"
84 ))
85 })?;
86 let tool_catalog = plugins.resolved_tool_catalog(session_id)?;
87 let lashlang_abilities = crate::runtime::builder::lashlang_abilities_for_process_registry(
88 plugins.lashlang_abilities(),
89 runtime.process_registry_available,
90 );
91 let current_environment = crate::session::lashlang_host_environment_from_tool_catalog(
92 &tool_catalog,
93 lashlang_abilities,
94 plugins.lashlang_language_features(),
95 plugins.lashlang_resources(),
96 );
97 lashlang_host_environment_satisfies_requirements(
98 &artifact.host_requirements,
99 ¤t_environment,
100 )
101 .map_err(|err| {
102 PluginError::Session(format!(
103 "lashlang process `{process_name}` is incompatible with captured process environment: {err}"
104 ))
105 })
106}
107
108pub(crate) fn lashlang_host_environment_satisfies_requirements(
109 required: &lashlang::HostRequirements,
110 current: &lashlang::LashlangHostEnvironment,
111) -> Result<(), String> {
112 let abilities = required.abilities;
113 let current_abilities = current.abilities;
114 if abilities.processes && !current_abilities.processes {
115 return Err("processes are not available".to_string());
116 }
117 if abilities.sleep && !current_abilities.sleep {
118 return Err("sleep is not available".to_string());
119 }
120 if abilities.process_signals && !current_abilities.process_signals {
121 return Err("process signals are not available".to_string());
122 }
123 if abilities.triggers && !current_abilities.triggers {
124 return Err("triggers are not available".to_string());
125 }
126 if required.language_features.label_annotations && !current.language_features.label_annotations
127 {
128 return Err("label annotations are not available".to_string());
129 }
130
131 for (_, module) in required.resources.module_instances() {
132 let current_module = current
133 .resources
134 .resolve_module_path(&module.path)
135 .ok_or_else(|| format!("module `{}` is not available", module.alias))?;
136 if current_module.resource_type != module.resource_type {
137 return Err(format!(
138 "module `{}` has type `{}`, expected `{}`",
139 module.alias, current_module.resource_type, module.resource_type
140 ));
141 }
142 for (operation, required_binding) in &module.operations {
143 match current.resources.resolve_module_operation(
144 &module.resource_type,
145 &module.alias,
146 operation,
147 ) {
148 Some(current_binding) if current_binding == required_binding => {}
149 Some(current_binding) => {
150 return Err(format!(
151 "module `{}` operation `{operation}` resolves to `{}`, expected `{}`",
152 module.alias,
153 current_binding.host_operation,
154 required_binding.host_operation
155 ));
156 }
157 None => {
158 return Err(format!(
159 "module `{}` does not expose operation `{operation}`",
160 module.alias
161 ));
162 }
163 }
164 }
165 }
166
167 for (resource_type, required_type) in required.resources.resource_types() {
168 if !current.resources.has_resource_type(resource_type) {
169 return Err(format!("resource type `{resource_type}` is not available"));
170 }
171 for (operation, required_binding) in &required_type.operations {
172 let current_binding = current
173 .resources
174 .resolve_operation(resource_type, operation)
175 .ok_or_else(|| {
176 format!(
177 "resource type `{resource_type}` does not expose operation `{operation}`"
178 )
179 })?;
180 if current_binding.input_ty != required_binding.input_ty {
181 return Err(format!(
182 "resource type `{resource_type}` operation `{operation}` has incompatible input type"
183 ));
184 }
185 if current_binding.output_ty != required_binding.output_ty {
186 return Err(format!(
187 "resource type `{resource_type}` operation `{operation}` has incompatible output type"
188 ));
189 }
190 }
191 }
192 for (name, required_data_type) in required.resources.named_data_types() {
193 let current_data_type = current
194 .resources
195 .resolve_named_data_type(name)
196 .ok_or_else(|| format!("host data type `{name}` is not available"))?;
197 if current_data_type != required_data_type {
198 return Err(format!(
199 "host data type `{name}` has incompatible structure"
200 ));
201 }
202 }
203 for (path, required_binding) in required.resources.value_constructors() {
204 let current_binding = current
205 .resources
206 .resolve_value_constructor(&path.split('.').collect::<Vec<_>>())
207 .ok_or_else(|| format!("value constructor `{path}` is not available"))?;
208 if current_binding.input_ty != required_binding.input_ty {
209 return Err(format!(
210 "value constructor `{path}` has incompatible input type"
211 ));
212 }
213 if current_binding.output_ty != required_binding.output_ty {
214 return Err(format!(
215 "value constructor `{path}` has incompatible output type"
216 ));
217 }
218 }
219 for (source_ty, required_binding) in required.resources.trigger_sources() {
220 let current_binding = current
221 .resources
222 .resolve_trigger_source(source_ty)
223 .ok_or_else(|| format!("trigger source type `{source_ty}` is not available"))?;
224 if current_binding != required_binding {
225 return Err(format!(
226 "trigger source type `{source_ty}` has incompatible event type"
227 ));
228 }
229 }
230
231 Ok(())
232}
233
234#[derive(Clone, Debug)]
235pub struct PreparedProcessEventAppend {
236 pub event: ProcessEvent,
237 pub payload_hash: String,
238 pub status_update: Option<ProcessStatus>,
239 pub wake_delivery: Option<ProcessWakeDelivery>,
240 pub occurred_at_ms: u64,
241 pub replayed: bool,
242}
243
244pub fn prepare_process_event_append(
245 record: &ProcessRecord,
246 request: ProcessEventAppendRequest,
247 sequence: u64,
248 replay_lookup: Option<(String, ProcessEvent)>,
249 occurred_at_ms: u64,
250) -> Result<PreparedProcessEventAppend, PluginError> {
251 let process_id = record.id.as_str();
252 let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
253 if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
254 && let Some((existing_hash, existing)) = replay_lookup
255 {
256 if existing_hash == payload_hash {
257 let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
258 (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
259 });
260 let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
261 let wake_delivery = prepare_wake_delivery(
262 process_id,
263 record,
264 existing.sequence,
265 existing.event_type.clone(),
266 existing.invocation.clone(),
267 existing.occurred_at,
268 existing.semantics.wake.clone(),
269 request
270 .wake_target_scope
271 .clone()
272 .or_else(|| record.wake_target.clone()),
273 )?;
274 return Ok(PreparedProcessEventAppend {
275 event: existing,
276 payload_hash,
277 status_update,
278 wake_delivery,
279 occurred_at_ms,
280 replayed: true,
281 });
282 }
283 return Err(PluginError::Session(format!(
284 "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
285 )));
286 }
287 let declared = record
288 .event_types
289 .iter()
290 .find(|declared| declared.name == request.event_type)
291 .ok_or_else(|| {
292 PluginError::Session(format!(
293 "process `{process_id}` emitted undeclared event type `{}`",
294 request.event_type
295 ))
296 })?;
297 require_event_replay(process_id, &request, &declared.semantics)?;
298 declared
299 .payload_schema
300 .validate(&request.payload)
301 .map_err(|err| {
302 PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
303 })?;
304 let semantics = materialize_process_event_semantics(
305 process_id,
306 sequence,
307 &request.payload,
308 &declared.semantics,
309 )?;
310 if semantics.terminal.is_some() && record.is_terminal() {
311 return Err(PluginError::Session(format!(
312 "process `{process_id}` is already terminal"
313 )));
314 }
315 let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
316 let event = ProcessEvent {
317 process_id: process_id.to_string(),
318 sequence,
319 event_type: request.event_type,
320 payload: request.payload,
321 invocation: crate::runtime::causal::process_event_invocation(
322 process_id,
323 sequence,
324 declared.name.as_str(),
325 request.replay,
326 ),
327 semantics: semantics.clone(),
328 occurred_at,
329 };
330 let wake_delivery = prepare_wake_delivery(
331 process_id,
332 record,
333 event.sequence,
334 event.event_type.clone(),
335 event.invocation.clone(),
336 event.occurred_at,
337 semantics.wake.clone(),
338 request
339 .wake_target_scope
340 .or_else(|| record.wake_target.clone()),
341 )?;
342 Ok(PreparedProcessEventAppend {
343 event,
344 payload_hash,
345 status_update: semantics.terminal.map(ProcessStatus::from_terminal),
346 wake_delivery,
347 occurred_at_ms,
348 replayed: false,
349 })
350}
351
352#[expect(
353 clippy::too_many_arguments,
354 reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
355)]
356fn prepare_wake_delivery(
357 process_id: &str,
358 record: &ProcessRecord,
359 sequence: u64,
360 event_type: String,
361 event_invocation: crate::RuntimeInvocation,
362 occurred_at: std::time::SystemTime,
363 wake: Option<super::events::ProcessWake>,
364 wake_target_scope: Option<super::model::SessionScope>,
365) -> Result<Option<ProcessWakeDelivery>, PluginError> {
366 let Some(wake) = wake else {
367 return Ok(None);
368 };
369 let Some(target_scope) = wake_target_scope else {
370 return Ok(None);
371 };
372 process_wake_delivery(ProcessWakeDeliveryRequest {
373 target_scope,
374 process_id: process_id.to_string(),
375 sequence,
376 event_type,
377 event_invocation,
378 process_caused_by: record.provenance.caused_by.clone(),
379 wake,
380 occurred_at,
381 })
382 .map(Some)
383}
384
385pub fn prepare_process_registration(
386 mut registration: ProcessRegistration,
387) -> Result<(ProcessRegistration, String), PluginError> {
388 ensure_core_event_types(&mut registration);
389 validate_process_registration(®istration)?;
390 let registration_hash = process_registration_hash(®istration)?;
391 Ok((registration, registration_hash))
392}
393
394pub fn process_registration_hash(
395 registration: &ProcessRegistration,
396) -> Result<String, PluginError> {
397 crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
398 PluginError::Session(format!(
399 "failed to hash process `{}` registration: {err}",
400 registration.id
401 ))
402 })
403}
404
405pub fn process_event_payload_hash(
406 event_type: &str,
407 payload: &serde_json::Value,
408) -> Result<String, PluginError> {
409 crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
410 PluginError::Session(format!(
411 "failed to hash `{event_type}` process event: {err}"
412 ))
413 })
414}
415
416pub fn require_event_replay(
417 process_id: &str,
418 request: &ProcessEventAppendRequest,
419 spec: &ProcessEventSemanticsSpec,
420) -> Result<(), PluginError> {
421 let requires_key =
422 spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
423 if requires_key
424 && request
425 .replay
426 .as_ref()
427 .is_none_or(|replay| replay.key.is_empty())
428 {
429 return Err(PluginError::Session(format!(
430 "process `{process_id}` event `{}` requires a deterministic replay key",
431 request.event_type
432 )));
433 }
434 Ok(())
435}
436
437pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
438 let mut existing = registration
439 .event_types
440 .iter()
441 .map(|event_type| event_type.name.clone())
442 .collect::<HashSet<_>>();
443 for event_type in default_process_event_types() {
444 if existing.insert(event_type.name.clone()) {
445 registration.event_types.push(event_type);
446 }
447 }
448}
449
450pub(super) fn validate_process_registration(
451 registration: &ProcessRegistration,
452) -> Result<(), PluginError> {
453 if registration.id.trim().is_empty() {
454 return Err(PluginError::Session(
455 "process id must be a non-empty string".to_string(),
456 ));
457 }
458 match registration.input.as_ref() {
459 super::model::ProcessInput::ToolCall { .. }
460 | super::model::ProcessInput::LashlangProcess { .. } => {
461 if registration.env_ref.is_none() {
462 return Err(PluginError::Session(format!(
463 "process `{}` requires a captured execution env",
464 registration.id
465 )));
466 }
467 }
468 super::model::ProcessInput::External { .. }
469 | super::model::ProcessInput::SessionTurn { .. } => {
470 if registration.env_ref.is_some() {
471 return Err(PluginError::Session(format!(
472 "process `{}` must not capture an execution env for this input kind",
473 registration.id
474 )));
475 }
476 }
477 }
478 let mut names = HashSet::new();
479 for event_type in ®istration.event_types {
480 if event_type.name.trim().is_empty() {
481 return Err(PluginError::Session(format!(
482 "process `{}` declares an empty event type",
483 registration.id
484 )));
485 }
486 if !names.insert(event_type.name.as_str()) {
487 return Err(PluginError::Session(format!(
488 "process `{}` declares duplicate event type `{}`",
489 registration.id, event_type.name
490 )));
491 }
492 if let Some(terminal) = &event_type.semantics.terminal
493 && terminal.state != ProcessTerminalState::Completed
494 && terminal.await_output.is_none()
495 {
496 return Err(PluginError::Session(format!(
497 "terminal event `{}` for process `{}` must declare await output",
498 event_type.name, registration.id
499 )));
500 }
501 }
502 Ok(())
503}