Skip to main content

lash_core/runtime/process/
validation.rs

1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6    ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7    ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub enum ProcessEventAppendPlan {
16    Insert {
17        event: ProcessEvent,
18        payload_hash: String,
19        status_update: Option<ProcessStatus>,
20        wake_delivery: Option<ProcessWakeDelivery>,
21        occurred_at_ms: u64,
22    },
23    Replay {
24        event: ProcessEvent,
25        repair_status: Option<ProcessStatus>,
26        wake_delivery: Option<ProcessWakeDelivery>,
27        occurred_at_ms: u64,
28    },
29}
30
31pub fn apply_process_status_projection(
32    record: &mut ProcessRecord,
33    status: ProcessStatus,
34    updated_at_ms: u64,
35) {
36    record.status = status;
37    if record.status.is_terminal() {
38        record.wait = None;
39    }
40    record.updated_at_ms = updated_at_ms;
41}
42
43pub fn prepare_process_event_append(
44    record: &ProcessRecord,
45    request: ProcessEventAppendRequest,
46    sequence: u64,
47    replay_lookup: Option<(String, ProcessEvent)>,
48    occurred_at_ms: u64,
49) -> Result<ProcessEventAppendPlan, PluginError> {
50    let process_id = record.id.as_str();
51    let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
52    if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
53        && let Some((existing_hash, existing)) = replay_lookup
54    {
55        if existing_hash == payload_hash {
56            let repair_status = existing.semantics.terminal.clone().and_then(|terminal| {
57                (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
58            });
59            let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
60            let wake_delivery = prepare_wake_delivery(
61                process_id,
62                record,
63                existing.sequence,
64                existing.event_type.clone(),
65                existing.invocation.clone(),
66                existing.occurred_at,
67                existing.semantics.wake.clone(),
68                request
69                    .wake_target_scope
70                    .clone()
71                    .or_else(|| record.wake_target.clone()),
72            )?;
73            return Ok(ProcessEventAppendPlan::Replay {
74                event: existing,
75                repair_status,
76                wake_delivery,
77                occurred_at_ms,
78            });
79        }
80        return Err(PluginError::Session(format!(
81            "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
82        )));
83    }
84    let declared = record
85        .event_types
86        .iter()
87        .find(|declared| declared.name == request.event_type)
88        .ok_or_else(|| {
89            PluginError::Session(format!(
90                "process `{process_id}` emitted undeclared event type `{}`",
91                request.event_type
92            ))
93        })?;
94    require_event_replay(process_id, &request, &declared.semantics)?;
95    declared
96        .payload_schema
97        .validate(&request.payload)
98        .map_err(|err| {
99            PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
100        })?;
101    let semantics = materialize_process_event_semantics(
102        process_id,
103        sequence,
104        &request.payload,
105        &declared.semantics,
106    )?;
107    if semantics.terminal.is_some() && record.is_terminal() {
108        return Err(PluginError::Session(format!(
109            "process `{process_id}` is already terminal"
110        )));
111    }
112    let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
113    let event = ProcessEvent {
114        process_id: process_id.to_string(),
115        sequence,
116        event_type: request.event_type,
117        payload: request.payload,
118        invocation: crate::runtime::causal::process_event_invocation(
119            process_id,
120            sequence,
121            declared.name.as_str(),
122            request.replay,
123        ),
124        semantics: semantics.clone(),
125        occurred_at,
126    };
127    let wake_delivery = prepare_wake_delivery(
128        process_id,
129        record,
130        event.sequence,
131        event.event_type.clone(),
132        event.invocation.clone(),
133        event.occurred_at,
134        semantics.wake.clone(),
135        request
136            .wake_target_scope
137            .or_else(|| record.wake_target.clone()),
138    )?;
139    Ok(ProcessEventAppendPlan::Insert {
140        event,
141        payload_hash,
142        status_update: semantics.terminal.map(ProcessStatus::from_terminal),
143        wake_delivery,
144        occurred_at_ms,
145    })
146}
147
148#[expect(
149    clippy::too_many_arguments,
150    reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
151)]
152fn prepare_wake_delivery(
153    process_id: &str,
154    record: &ProcessRecord,
155    sequence: u64,
156    event_type: String,
157    event_invocation: crate::RuntimeInvocation,
158    occurred_at: std::time::SystemTime,
159    wake: Option<super::events::ProcessWake>,
160    wake_target_scope: Option<super::model::SessionScope>,
161) -> Result<Option<ProcessWakeDelivery>, PluginError> {
162    let Some(wake) = wake else {
163        return Ok(None);
164    };
165    let Some(target_scope) = wake_target_scope else {
166        return Ok(None);
167    };
168    process_wake_delivery(ProcessWakeDeliveryRequest {
169        target_scope,
170        process_id: process_id.to_string(),
171        sequence,
172        event_type,
173        event_invocation,
174        process_caused_by: record.provenance.caused_by.clone(),
175        wake,
176        occurred_at,
177    })
178    .map(Some)
179}
180
181pub fn prepare_process_registration(
182    mut registration: ProcessRegistration,
183) -> Result<(ProcessRegistration, String), PluginError> {
184    ensure_core_event_types(&mut registration);
185    validate_process_registration(&registration)?;
186    let registration_hash = process_registration_hash(&registration)?;
187    Ok((registration, registration_hash))
188}
189
190pub fn process_registration_hash(
191    registration: &ProcessRegistration,
192) -> Result<String, PluginError> {
193    crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
194        PluginError::Session(format!(
195            "failed to hash process `{}` registration: {err}",
196            registration.id
197        ))
198    })
199}
200
201pub fn process_event_payload_hash(
202    event_type: &str,
203    payload: &serde_json::Value,
204) -> Result<String, PluginError> {
205    crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
206        PluginError::Session(format!(
207            "failed to hash `{event_type}` process event: {err}"
208        ))
209    })
210}
211
212pub fn require_event_replay(
213    process_id: &str,
214    request: &ProcessEventAppendRequest,
215    spec: &ProcessEventSemanticsSpec,
216) -> Result<(), PluginError> {
217    let requires_key =
218        spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
219    if requires_key
220        && request
221            .replay
222            .as_ref()
223            .is_none_or(|replay| replay.key.is_empty())
224    {
225        return Err(PluginError::Session(format!(
226            "process `{process_id}` event `{}` requires a deterministic replay key",
227            request.event_type
228        )));
229    }
230    Ok(())
231}
232
233pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
234    let mut existing = registration
235        .event_types
236        .iter()
237        .map(|event_type| event_type.name.clone())
238        .collect::<HashSet<_>>();
239    for event_type in default_process_event_types() {
240        if existing.insert(event_type.name.clone()) {
241            registration.event_types.push(event_type);
242        }
243    }
244}
245
246pub(super) fn validate_process_registration(
247    registration: &ProcessRegistration,
248) -> Result<(), PluginError> {
249    if registration.id.trim().is_empty() {
250        return Err(PluginError::Session(
251            "process id must be a non-empty string".to_string(),
252        ));
253    }
254    match registration.input.as_ref() {
255        super::model::ProcessInput::ToolCall { .. } | super::model::ProcessInput::Engine { .. } => {
256            if registration.env_ref.is_none() {
257                return Err(PluginError::Session(format!(
258                    "process `{}` requires a captured execution env",
259                    registration.id
260                )));
261            }
262        }
263        super::model::ProcessInput::External { .. }
264        | super::model::ProcessInput::SessionTurn { .. } => {
265            if registration.env_ref.is_some() {
266                return Err(PluginError::Session(format!(
267                    "process `{}` must not capture an execution env for this input kind",
268                    registration.id
269                )));
270            }
271        }
272    }
273    let mut names = HashSet::new();
274    for event_type in &registration.event_types {
275        if event_type.name.trim().is_empty() {
276            return Err(PluginError::Session(format!(
277                "process `{}` declares an empty event type",
278                registration.id
279            )));
280        }
281        if !names.insert(event_type.name.as_str()) {
282            return Err(PluginError::Session(format!(
283                "process `{}` declares duplicate event type `{}`",
284                registration.id, event_type.name
285            )));
286        }
287        if let Some(terminal) = &event_type.semantics.terminal
288            && terminal.state != ProcessTerminalState::Completed
289            && terminal.await_output.is_none()
290        {
291            return Err(PluginError::Session(format!(
292                "terminal event `{}` for process `{}` must declare await output",
293                event_type.name, registration.id
294            )));
295        }
296    }
297    Ok(())
298}