Skip to main content

lash_core/runtime/process/
validation.rs

1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6    ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7    ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Copy, Debug)]
15pub(crate) struct ProcessEnvValidationRuntime {
16    pub(crate) process_registry_available: bool,
17}
18
19pub(crate) async fn validate_lashlang_process_execution_env(
20    artifact_store: &dyn lashlang::LashlangArtifactStore,
21    plugin_host: &crate::PluginHost,
22    session_id: &str,
23    runtime: ProcessEnvValidationRuntime,
24    input: &super::model::ProcessInput,
25    env_spec: &super::model::ProcessExecutionEnvSpec,
26) -> Result<(), PluginError> {
27    let super::model::ProcessInput::LashlangProcess {
28        module_ref,
29        process_ref,
30        host_requirements_ref,
31        process_name,
32        ..
33    } = input
34    else {
35        return Ok(());
36    };
37
38    let artifact = artifact_store
39        .get_module_artifact(module_ref)
40        .await
41        .map_err(|err| {
42            PluginError::Session(format!(
43                "failed to load lashlang module artifact `{module_ref}` while validating process environment: {err}"
44            ))
45        })?
46        .ok_or_else(|| {
47            PluginError::Session(format!(
48                "missing lashlang module artifact `{module_ref}` while validating process environment"
49            ))
50        })?;
51    if artifact.host_requirements_ref != *host_requirements_ref {
52        return Err(PluginError::Session(format!(
53            "lashlang process `{process_name}` requested host requirements {}, artifact has {}",
54            host_requirements_ref, artifact.host_requirements_ref
55        )));
56    }
57    if artifact.process_ref(process_name) != Some(process_ref) {
58        return Err(PluginError::Session(format!(
59            "lashlang module `{module_ref}` does not export process `{process_name}` as requested ref {:?}",
60            process_ref
61        )));
62    }
63
64    let host = plugin_host.clone().with_lashlang_abilities(
65        crate::runtime::builder::lashlang_abilities_for_process_registry(
66            plugin_host.lashlang_abilities(),
67            runtime.process_registry_available,
68        ),
69    );
70    let plugins = host
71        .isolated_registry()
72        .build_session_with_parent(
73            session_id.to_string(),
74            None,
75            None,
76            crate::plugin::SessionAuthorityContext {
77                plugin_options: env_spec.plugin_options.clone(),
78                ..Default::default()
79            },
80        )
81        .map_err(|err| {
82            PluginError::Session(format!(
83                "failed to rebuild process environment plugin options for `{process_name}`: {err}"
84            ))
85        })?;
86    let tool_catalog = plugins.resolved_tool_catalog(session_id)?;
87    let lashlang_abilities = crate::runtime::builder::lashlang_abilities_for_process_registry(
88        plugins.lashlang_abilities(),
89        runtime.process_registry_available,
90    );
91    let current_environment = crate::session::lashlang_host_environment_from_tool_catalog(
92        &tool_catalog,
93        lashlang_abilities,
94        plugins.lashlang_language_features(),
95        plugins.lashlang_resources(),
96    );
97    lashlang_host_environment_satisfies_requirements(
98        &artifact.host_requirements,
99        &current_environment,
100    )
101    .map_err(|err| {
102        PluginError::Session(format!(
103            "lashlang process `{process_name}` is incompatible with captured process environment: {err}"
104        ))
105    })
106}
107
108pub(crate) fn lashlang_host_environment_satisfies_requirements(
109    required: &lashlang::HostRequirements,
110    current: &lashlang::LashlangHostEnvironment,
111) -> Result<(), String> {
112    let abilities = required.abilities;
113    let current_abilities = current.abilities;
114    if abilities.processes && !current_abilities.processes {
115        return Err("processes are not available".to_string());
116    }
117    if abilities.sleep && !current_abilities.sleep {
118        return Err("sleep is not available".to_string());
119    }
120    if abilities.process_signals && !current_abilities.process_signals {
121        return Err("process signals are not available".to_string());
122    }
123    if abilities.triggers && !current_abilities.triggers {
124        return Err("triggers are not available".to_string());
125    }
126    if required.language_features.label_annotations && !current.language_features.label_annotations
127    {
128        return Err("label annotations are not available".to_string());
129    }
130
131    for (_, module) in required.resources.module_instances() {
132        let current_module = current
133            .resources
134            .resolve_module_path(&module.path)
135            .ok_or_else(|| format!("module `{}` is not available", module.alias))?;
136        if current_module.resource_type != module.resource_type {
137            return Err(format!(
138                "module `{}` has type `{}`, expected `{}`",
139                module.alias, current_module.resource_type, module.resource_type
140            ));
141        }
142        for (operation, required_binding) in &module.operations {
143            match current.resources.resolve_module_operation(
144                &module.resource_type,
145                &module.alias,
146                operation,
147            ) {
148                Some(current_binding) if current_binding == required_binding => {}
149                Some(current_binding) => {
150                    return Err(format!(
151                        "module `{}` operation `{operation}` resolves to `{}`, expected `{}`",
152                        module.alias,
153                        current_binding.host_operation,
154                        required_binding.host_operation
155                    ));
156                }
157                None => {
158                    return Err(format!(
159                        "module `{}` does not expose operation `{operation}`",
160                        module.alias
161                    ));
162                }
163            }
164        }
165    }
166
167    for (resource_type, required_type) in required.resources.resource_types() {
168        if !current.resources.has_resource_type(resource_type) {
169            return Err(format!("resource type `{resource_type}` is not available"));
170        }
171        for (operation, required_binding) in &required_type.operations {
172            let current_binding = current
173                .resources
174                .resolve_operation(resource_type, operation)
175                .ok_or_else(|| {
176                    format!(
177                        "resource type `{resource_type}` does not expose operation `{operation}`"
178                    )
179                })?;
180            if current_binding.input_ty != required_binding.input_ty {
181                return Err(format!(
182                    "resource type `{resource_type}` operation `{operation}` has incompatible input type"
183                ));
184            }
185            if current_binding.output_ty != required_binding.output_ty {
186                return Err(format!(
187                    "resource type `{resource_type}` operation `{operation}` has incompatible output type"
188                ));
189            }
190        }
191    }
192    for (name, required_data_type) in required.resources.named_data_types() {
193        let current_data_type = current
194            .resources
195            .resolve_named_data_type(name)
196            .ok_or_else(|| format!("host data type `{name}` is not available"))?;
197        if current_data_type != required_data_type {
198            return Err(format!(
199                "host data type `{name}` has incompatible structure"
200            ));
201        }
202    }
203    for (path, required_binding) in required.resources.value_constructors() {
204        let current_binding = current
205            .resources
206            .resolve_value_constructor(&path.split('.').collect::<Vec<_>>())
207            .ok_or_else(|| format!("value constructor `{path}` is not available"))?;
208        if current_binding.input_ty != required_binding.input_ty {
209            return Err(format!(
210                "value constructor `{path}` has incompatible input type"
211            ));
212        }
213        if current_binding.output_ty != required_binding.output_ty {
214            return Err(format!(
215                "value constructor `{path}` has incompatible output type"
216            ));
217        }
218    }
219    for (source_ty, required_binding) in required.resources.trigger_sources() {
220        let current_binding = current
221            .resources
222            .resolve_trigger_source(source_ty)
223            .ok_or_else(|| format!("trigger source type `{source_ty}` is not available"))?;
224        if current_binding != required_binding {
225            return Err(format!(
226                "trigger source type `{source_ty}` has incompatible event type"
227            ));
228        }
229    }
230
231    Ok(())
232}
233
234#[derive(Clone, Debug)]
235pub struct PreparedProcessEventAppend {
236    pub event: ProcessEvent,
237    pub payload_hash: String,
238    pub status_update: Option<ProcessStatus>,
239    pub wake_delivery: Option<ProcessWakeDelivery>,
240    pub occurred_at_ms: u64,
241    pub replayed: bool,
242}
243
244pub fn prepare_process_event_append(
245    record: &ProcessRecord,
246    request: ProcessEventAppendRequest,
247    sequence: u64,
248    replay_lookup: Option<(String, ProcessEvent)>,
249    occurred_at_ms: u64,
250) -> Result<PreparedProcessEventAppend, PluginError> {
251    let process_id = record.id.as_str();
252    let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
253    if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
254        && let Some((existing_hash, existing)) = replay_lookup
255    {
256        if existing_hash == payload_hash {
257            let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
258                (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
259            });
260            let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
261            let wake_delivery = prepare_wake_delivery(
262                process_id,
263                record,
264                existing.sequence,
265                existing.event_type.clone(),
266                existing.invocation.clone(),
267                existing.occurred_at,
268                existing.semantics.wake.clone(),
269                request
270                    .wake_target_scope
271                    .clone()
272                    .or_else(|| record.wake_target.clone()),
273            )?;
274            return Ok(PreparedProcessEventAppend {
275                event: existing,
276                payload_hash,
277                status_update,
278                wake_delivery,
279                occurred_at_ms,
280                replayed: true,
281            });
282        }
283        return Err(PluginError::Session(format!(
284            "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
285        )));
286    }
287    let declared = record
288        .event_types
289        .iter()
290        .find(|declared| declared.name == request.event_type)
291        .ok_or_else(|| {
292            PluginError::Session(format!(
293                "process `{process_id}` emitted undeclared event type `{}`",
294                request.event_type
295            ))
296        })?;
297    require_event_replay(process_id, &request, &declared.semantics)?;
298    declared
299        .payload_schema
300        .validate(&request.payload)
301        .map_err(|err| {
302            PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
303        })?;
304    let semantics = materialize_process_event_semantics(
305        process_id,
306        sequence,
307        &request.payload,
308        &declared.semantics,
309    )?;
310    if semantics.terminal.is_some() && record.is_terminal() {
311        return Err(PluginError::Session(format!(
312            "process `{process_id}` is already terminal"
313        )));
314    }
315    let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
316    let event = ProcessEvent {
317        process_id: process_id.to_string(),
318        sequence,
319        event_type: request.event_type,
320        payload: request.payload,
321        invocation: crate::runtime::causal::process_event_invocation(
322            process_id,
323            sequence,
324            declared.name.as_str(),
325            request.replay,
326        ),
327        semantics: semantics.clone(),
328        occurred_at,
329    };
330    let wake_delivery = prepare_wake_delivery(
331        process_id,
332        record,
333        event.sequence,
334        event.event_type.clone(),
335        event.invocation.clone(),
336        event.occurred_at,
337        semantics.wake.clone(),
338        request
339            .wake_target_scope
340            .or_else(|| record.wake_target.clone()),
341    )?;
342    Ok(PreparedProcessEventAppend {
343        event,
344        payload_hash,
345        status_update: semantics.terminal.map(ProcessStatus::from_terminal),
346        wake_delivery,
347        occurred_at_ms,
348        replayed: false,
349    })
350}
351
352#[expect(
353    clippy::too_many_arguments,
354    reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
355)]
356fn prepare_wake_delivery(
357    process_id: &str,
358    record: &ProcessRecord,
359    sequence: u64,
360    event_type: String,
361    event_invocation: crate::RuntimeInvocation,
362    occurred_at: std::time::SystemTime,
363    wake: Option<super::events::ProcessWake>,
364    wake_target_scope: Option<super::model::SessionScope>,
365) -> Result<Option<ProcessWakeDelivery>, PluginError> {
366    let Some(wake) = wake else {
367        return Ok(None);
368    };
369    let Some(target_scope) = wake_target_scope else {
370        return Ok(None);
371    };
372    process_wake_delivery(ProcessWakeDeliveryRequest {
373        target_scope,
374        process_id: process_id.to_string(),
375        sequence,
376        event_type,
377        event_invocation,
378        process_caused_by: record.provenance.caused_by.clone(),
379        wake,
380        occurred_at,
381    })
382    .map(Some)
383}
384
385pub fn prepare_process_registration(
386    mut registration: ProcessRegistration,
387) -> Result<(ProcessRegistration, String), PluginError> {
388    ensure_core_event_types(&mut registration);
389    validate_process_registration(&registration)?;
390    let registration_hash = process_registration_hash(&registration)?;
391    Ok((registration, registration_hash))
392}
393
394pub fn process_registration_hash(
395    registration: &ProcessRegistration,
396) -> Result<String, PluginError> {
397    crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
398        PluginError::Session(format!(
399            "failed to hash process `{}` registration: {err}",
400            registration.id
401        ))
402    })
403}
404
405pub fn process_event_payload_hash(
406    event_type: &str,
407    payload: &serde_json::Value,
408) -> Result<String, PluginError> {
409    crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
410        PluginError::Session(format!(
411            "failed to hash `{event_type}` process event: {err}"
412        ))
413    })
414}
415
416pub fn require_event_replay(
417    process_id: &str,
418    request: &ProcessEventAppendRequest,
419    spec: &ProcessEventSemanticsSpec,
420) -> Result<(), PluginError> {
421    let requires_key =
422        spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
423    if requires_key
424        && request
425            .replay
426            .as_ref()
427            .is_none_or(|replay| replay.key.is_empty())
428    {
429        return Err(PluginError::Session(format!(
430            "process `{process_id}` event `{}` requires a deterministic replay key",
431            request.event_type
432        )));
433    }
434    Ok(())
435}
436
437pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
438    let mut existing = registration
439        .event_types
440        .iter()
441        .map(|event_type| event_type.name.clone())
442        .collect::<HashSet<_>>();
443    for event_type in default_process_event_types() {
444        if existing.insert(event_type.name.clone()) {
445            registration.event_types.push(event_type);
446        }
447    }
448}
449
450pub(super) fn validate_process_registration(
451    registration: &ProcessRegistration,
452) -> Result<(), PluginError> {
453    if registration.id.trim().is_empty() {
454        return Err(PluginError::Session(
455            "process id must be a non-empty string".to_string(),
456        ));
457    }
458    match registration.input.as_ref() {
459        super::model::ProcessInput::ToolCall { .. }
460        | super::model::ProcessInput::LashlangProcess { .. } => {
461            if registration.env_ref.is_none() {
462                return Err(PluginError::Session(format!(
463                    "process `{}` requires a captured execution env",
464                    registration.id
465                )));
466            }
467        }
468        super::model::ProcessInput::External { .. }
469        | super::model::ProcessInput::SessionTurn { .. } => {
470            if registration.env_ref.is_some() {
471                return Err(PluginError::Session(format!(
472                    "process `{}` must not capture an execution env for this input kind",
473                    registration.id
474                )));
475            }
476        }
477    }
478    let mut names = HashSet::new();
479    for event_type in &registration.event_types {
480        if event_type.name.trim().is_empty() {
481            return Err(PluginError::Session(format!(
482                "process `{}` declares an empty event type",
483                registration.id
484            )));
485        }
486        if !names.insert(event_type.name.as_str()) {
487            return Err(PluginError::Session(format!(
488                "process `{}` declares duplicate event type `{}`",
489                registration.id, event_type.name
490            )));
491        }
492        if let Some(terminal) = &event_type.semantics.terminal
493            && terminal.state != ProcessTerminalState::Completed
494            && terminal.await_output.is_none()
495        {
496            return Err(PluginError::Session(format!(
497                "terminal event `{}` for process `{}` must declare await output",
498                event_type.name, registration.id
499            )));
500        }
501    }
502    Ok(())
503}