Skip to main content

lash_core/runtime/process/
validation.rs

1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6    ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7    ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16    pub event: ProcessEvent,
17    pub payload_hash: String,
18    pub status_update: Option<ProcessStatus>,
19    pub wake_delivery: Option<ProcessWakeDelivery>,
20    pub occurred_at_ms: u64,
21    pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25    record: &ProcessRecord,
26    request: ProcessEventAppendRequest,
27    sequence: u64,
28    replay_lookup: Option<(String, ProcessEvent)>,
29    occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31    let process_id = record.id.as_str();
32    let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33    if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34        && let Some((existing_hash, existing)) = replay_lookup
35    {
36        if existing_hash == payload_hash {
37            let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
38                (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
39            });
40            let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
41            let wake_delivery = prepare_wake_delivery(
42                process_id,
43                record,
44                existing.sequence,
45                existing.event_type.clone(),
46                existing.invocation.clone(),
47                existing.occurred_at,
48                existing.semantics.wake.clone(),
49                request
50                    .wake_target_scope
51                    .clone()
52                    .or_else(|| record.wake_target.clone()),
53            )?;
54            return Ok(PreparedProcessEventAppend {
55                event: existing,
56                payload_hash,
57                status_update,
58                wake_delivery,
59                occurred_at_ms,
60                replayed: true,
61            });
62        }
63        return Err(PluginError::Session(format!(
64            "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
65        )));
66    }
67    let declared = record
68        .event_types
69        .iter()
70        .find(|declared| declared.name == request.event_type)
71        .ok_or_else(|| {
72            PluginError::Session(format!(
73                "process `{process_id}` emitted undeclared event type `{}`",
74                request.event_type
75            ))
76        })?;
77    require_event_replay(process_id, &request, &declared.semantics)?;
78    declared
79        .payload_schema
80        .validate(&request.payload)
81        .map_err(|err| {
82            PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
83        })?;
84    let semantics = materialize_process_event_semantics(
85        process_id,
86        sequence,
87        &request.payload,
88        &declared.semantics,
89    )?;
90    if semantics.terminal.is_some() && record.is_terminal() {
91        return Err(PluginError::Session(format!(
92            "process `{process_id}` is already terminal"
93        )));
94    }
95    let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
96    let event = ProcessEvent {
97        process_id: process_id.to_string(),
98        sequence,
99        event_type: request.event_type,
100        payload: request.payload,
101        invocation: crate::runtime::causal::process_event_invocation(
102            process_id,
103            sequence,
104            declared.name.as_str(),
105            request.replay,
106        ),
107        semantics: semantics.clone(),
108        occurred_at,
109    };
110    let wake_delivery = prepare_wake_delivery(
111        process_id,
112        record,
113        event.sequence,
114        event.event_type.clone(),
115        event.invocation.clone(),
116        event.occurred_at,
117        semantics.wake.clone(),
118        request
119            .wake_target_scope
120            .or_else(|| record.wake_target.clone()),
121    )?;
122    Ok(PreparedProcessEventAppend {
123        event,
124        payload_hash,
125        status_update: semantics.terminal.map(ProcessStatus::from_terminal),
126        wake_delivery,
127        occurred_at_ms,
128        replayed: false,
129    })
130}
131
132#[expect(
133    clippy::too_many_arguments,
134    reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
135)]
136fn prepare_wake_delivery(
137    process_id: &str,
138    record: &ProcessRecord,
139    sequence: u64,
140    event_type: String,
141    event_invocation: crate::RuntimeInvocation,
142    occurred_at: std::time::SystemTime,
143    wake: Option<super::events::ProcessWake>,
144    wake_target_scope: Option<super::model::SessionScope>,
145) -> Result<Option<ProcessWakeDelivery>, PluginError> {
146    let Some(wake) = wake else {
147        return Ok(None);
148    };
149    let Some(target_scope) = wake_target_scope else {
150        return Ok(None);
151    };
152    process_wake_delivery(ProcessWakeDeliveryRequest {
153        target_scope,
154        process_id: process_id.to_string(),
155        sequence,
156        event_type,
157        event_invocation,
158        process_caused_by: record.provenance.caused_by.clone(),
159        wake,
160        occurred_at,
161    })
162    .map(Some)
163}
164
165pub fn prepare_process_registration(
166    mut registration: ProcessRegistration,
167) -> Result<(ProcessRegistration, String), PluginError> {
168    ensure_core_event_types(&mut registration);
169    validate_process_registration(&registration)?;
170    let registration_hash = process_registration_hash(&registration)?;
171    Ok((registration, registration_hash))
172}
173
174pub fn process_registration_hash(
175    registration: &ProcessRegistration,
176) -> Result<String, PluginError> {
177    crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
178        PluginError::Session(format!(
179            "failed to hash process `{}` registration: {err}",
180            registration.id
181        ))
182    })
183}
184
185pub fn process_event_payload_hash(
186    event_type: &str,
187    payload: &serde_json::Value,
188) -> Result<String, PluginError> {
189    crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
190        PluginError::Session(format!(
191            "failed to hash `{event_type}` process event: {err}"
192        ))
193    })
194}
195
196pub fn require_event_replay(
197    process_id: &str,
198    request: &ProcessEventAppendRequest,
199    spec: &ProcessEventSemanticsSpec,
200) -> Result<(), PluginError> {
201    let requires_key =
202        spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
203    if requires_key
204        && request
205            .replay
206            .as_ref()
207            .is_none_or(|replay| replay.key.is_empty())
208    {
209        return Err(PluginError::Session(format!(
210            "process `{process_id}` event `{}` requires a deterministic replay key",
211            request.event_type
212        )));
213    }
214    Ok(())
215}
216
217pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
218    let mut existing = registration
219        .event_types
220        .iter()
221        .map(|event_type| event_type.name.clone())
222        .collect::<HashSet<_>>();
223    for event_type in default_process_event_types() {
224        if existing.insert(event_type.name.clone()) {
225            registration.event_types.push(event_type);
226        }
227    }
228}
229
230pub(super) fn validate_process_registration(
231    registration: &ProcessRegistration,
232) -> Result<(), PluginError> {
233    if registration.id.trim().is_empty() {
234        return Err(PluginError::Session(
235            "process id must be a non-empty string".to_string(),
236        ));
237    }
238    if registration.provenance.host_profile_id.trim().is_empty() {
239        return Err(PluginError::Session(format!(
240            "process `{}` host profile id must be non-empty",
241            registration.id
242        )));
243    }
244    match registration.input.as_ref() {
245        super::model::ProcessInput::ToolCall { .. }
246        | super::model::ProcessInput::LashlangProcess { .. } => {
247            if registration.env_ref.is_none() {
248                return Err(PluginError::Session(format!(
249                    "process `{}` requires a captured execution env",
250                    registration.id
251                )));
252            }
253        }
254        super::model::ProcessInput::External { .. }
255        | super::model::ProcessInput::SessionTurn { .. } => {
256            if registration.env_ref.is_some() {
257                return Err(PluginError::Session(format!(
258                    "process `{}` must not capture an execution env for this input kind",
259                    registration.id
260                )));
261            }
262        }
263    }
264    let mut names = HashSet::new();
265    for event_type in &registration.event_types {
266        if event_type.name.trim().is_empty() {
267            return Err(PluginError::Session(format!(
268                "process `{}` declares an empty event type",
269                registration.id
270            )));
271        }
272        if !names.insert(event_type.name.as_str()) {
273            return Err(PluginError::Session(format!(
274                "process `{}` declares duplicate event type `{}`",
275                registration.id, event_type.name
276            )));
277        }
278        if let Some(terminal) = &event_type.semantics.terminal
279            && terminal.state != ProcessTerminalState::Completed
280            && terminal.await_output.is_none()
281        {
282            return Err(PluginError::Session(format!(
283                "terminal event `{}` for process `{}` must declare await output",
284                event_type.name, registration.id
285            )));
286        }
287    }
288    Ok(())
289}