Skip to main content

lash_core/runtime/process/
validation.rs

1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6    ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7    ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::system_time_from_epoch_ms;
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16    pub event: ProcessEvent,
17    pub payload_hash: String,
18    pub status_update: Option<ProcessStatus>,
19    pub wake_delivery: Option<ProcessWakeDelivery>,
20    pub occurred_at_ms: u64,
21    pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25    record: &ProcessRecord,
26    request: ProcessEventAppendRequest,
27    sequence: u64,
28    replay_lookup: Option<(String, ProcessEvent)>,
29    occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31    let process_id = record.id.as_str();
32    let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33    if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34        && let Some((existing_hash, existing)) = replay_lookup
35    {
36        if existing_hash == payload_hash {
37            let wake_delivery = prepare_wake_delivery(
38                process_id,
39                record,
40                existing.sequence,
41                existing.event_type.clone(),
42                existing.invocation.clone(),
43                existing.occurred_at,
44                existing.semantics.wake.clone(),
45                request.wake_target_scope.clone(),
46            )?;
47            return Ok(PreparedProcessEventAppend {
48                event: existing,
49                payload_hash,
50                status_update: None,
51                wake_delivery,
52                occurred_at_ms,
53                replayed: true,
54            });
55        }
56        return Err(PluginError::Session(format!(
57            "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
58        )));
59    }
60    let declared = record
61        .event_types
62        .iter()
63        .find(|declared| declared.name == request.event_type)
64        .ok_or_else(|| {
65            PluginError::Session(format!(
66                "process `{process_id}` emitted undeclared event type `{}`",
67                request.event_type
68            ))
69        })?;
70    require_event_replay(process_id, &request, &declared.semantics)?;
71    declared
72        .payload_schema
73        .validate(&request.payload)
74        .map_err(|err| {
75            PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
76        })?;
77    let semantics = materialize_process_event_semantics(
78        process_id,
79        sequence,
80        &request.payload,
81        &declared.semantics,
82    )?;
83    if semantics.terminal.is_some() && record.is_terminal() {
84        return Err(PluginError::Session(format!(
85            "process `{process_id}` is already terminal"
86        )));
87    }
88    let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
89    let event = ProcessEvent {
90        process_id: process_id.to_string(),
91        sequence,
92        event_type: request.event_type,
93        payload: request.payload,
94        invocation: crate::runtime::causal::process_event_invocation(
95            &record.provenance.owner_scope.session_id,
96            process_id,
97            sequence,
98            declared.name.as_str(),
99            request.replay,
100        ),
101        semantics: semantics.clone(),
102        occurred_at,
103    };
104    let wake_delivery = prepare_wake_delivery(
105        process_id,
106        record,
107        event.sequence,
108        event.event_type.clone(),
109        event.invocation.clone(),
110        event.occurred_at,
111        semantics.wake.clone(),
112        request.wake_target_scope,
113    )?;
114    Ok(PreparedProcessEventAppend {
115        event,
116        payload_hash,
117        status_update: semantics.terminal.map(ProcessStatus::from_terminal),
118        wake_delivery,
119        occurred_at_ms,
120        replayed: false,
121    })
122}
123
124#[expect(
125    clippy::too_many_arguments,
126    reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
127)]
128fn prepare_wake_delivery(
129    process_id: &str,
130    record: &ProcessRecord,
131    sequence: u64,
132    event_type: String,
133    event_invocation: crate::RuntimeInvocation,
134    occurred_at: std::time::SystemTime,
135    wake: Option<super::events::ProcessWake>,
136    wake_target_scope: Option<super::model::ProcessScope>,
137) -> Result<Option<ProcessWakeDelivery>, PluginError> {
138    let Some(wake) = wake else {
139        return Ok(None);
140    };
141    let Some(target_scope) = wake_target_scope else {
142        return Err(PluginError::Session(format!(
143            "process `{process_id}` emitted wake event `{event_type}` without a wake target scope"
144        )));
145    };
146    process_wake_delivery(ProcessWakeDeliveryRequest {
147        target_scope,
148        process_id: process_id.to_string(),
149        sequence,
150        event_type,
151        event_invocation,
152        process_caused_by: record.provenance.caused_by.clone(),
153        wake,
154        occurred_at,
155    })
156    .map(Some)
157}
158
159pub fn prepare_process_registration(
160    mut registration: ProcessRegistration,
161) -> Result<(ProcessRegistration, String), PluginError> {
162    ensure_core_event_types(&mut registration);
163    validate_process_registration(&registration)?;
164    let registration_hash = process_registration_hash(&registration)?;
165    Ok((registration, registration_hash))
166}
167
168pub fn process_registration_hash(
169    registration: &ProcessRegistration,
170) -> Result<String, PluginError> {
171    crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
172        PluginError::Session(format!(
173            "failed to hash process `{}` registration: {err}",
174            registration.id
175        ))
176    })
177}
178
179pub fn process_event_payload_hash(
180    event_type: &str,
181    payload: &serde_json::Value,
182) -> Result<String, PluginError> {
183    crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
184        PluginError::Session(format!(
185            "failed to hash `{event_type}` process event: {err}"
186        ))
187    })
188}
189
190pub fn require_event_replay(
191    process_id: &str,
192    request: &ProcessEventAppendRequest,
193    spec: &ProcessEventSemanticsSpec,
194) -> Result<(), PluginError> {
195    let requires_key =
196        spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
197    if requires_key
198        && request
199            .replay
200            .as_ref()
201            .is_none_or(|replay| replay.key.is_empty())
202    {
203        return Err(PluginError::Session(format!(
204            "process `{process_id}` event `{}` requires a deterministic replay key",
205            request.event_type
206        )));
207    }
208    Ok(())
209}
210
211pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
212    let mut existing = registration
213        .event_types
214        .iter()
215        .map(|event_type| event_type.name.clone())
216        .collect::<HashSet<_>>();
217    for event_type in default_process_event_types() {
218        if existing.insert(event_type.name.clone()) {
219            registration.event_types.push(event_type);
220        }
221    }
222}
223
224pub(super) fn validate_process_registration(
225    registration: &ProcessRegistration,
226) -> Result<(), PluginError> {
227    if registration.id.trim().is_empty() {
228        return Err(PluginError::Session(
229            "process id must be a non-empty string".to_string(),
230        ));
231    }
232    if registration.provenance.owner_scope.is_empty() {
233        return Err(PluginError::Session(format!(
234            "process `{}` owner scope must include a session id",
235            registration.id
236        )));
237    }
238    if registration.provenance.host_profile_id.trim().is_empty() {
239        return Err(PluginError::Session(format!(
240            "process `{}` host profile id must be non-empty",
241            registration.id
242        )));
243    }
244    let mut names = HashSet::new();
245    for event_type in &registration.event_types {
246        if event_type.name.trim().is_empty() {
247            return Err(PluginError::Session(format!(
248                "process `{}` declares an empty event type",
249                registration.id
250            )));
251        }
252        if !names.insert(event_type.name.as_str()) {
253            return Err(PluginError::Session(format!(
254                "process `{}` declares duplicate event type `{}`",
255                registration.id, event_type.name
256            )));
257        }
258        if let Some(terminal) = &event_type.semantics.terminal
259            && terminal.state != ProcessTerminalState::Completed
260            && terminal.await_output.is_none()
261        {
262            return Err(PluginError::Session(format!(
263                "terminal event `{}` for process `{}` must declare await output",
264                event_type.name, registration.id
265            )));
266        }
267    }
268    Ok(())
269}