Skip to main content

lash_core/runtime/process/
validation.rs

1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6    ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7    ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16    pub event: ProcessEvent,
17    pub payload_hash: String,
18    pub status_update: Option<ProcessStatus>,
19    pub wake_delivery: Option<ProcessWakeDelivery>,
20    pub occurred_at_ms: u64,
21    pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25    record: &ProcessRecord,
26    request: ProcessEventAppendRequest,
27    sequence: u64,
28    replay_lookup: Option<(String, ProcessEvent)>,
29    occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31    let process_id = record.id.as_str();
32    let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33    if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34        && let Some((existing_hash, existing)) = replay_lookup
35    {
36        if existing_hash == payload_hash {
37            let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
38                (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
39            });
40            let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
41            let wake_delivery = prepare_wake_delivery(
42                process_id,
43                record,
44                existing.sequence,
45                existing.event_type.clone(),
46                existing.invocation.clone(),
47                existing.occurred_at,
48                existing.semantics.wake.clone(),
49                request.wake_target_scope.clone(),
50            )?;
51            return Ok(PreparedProcessEventAppend {
52                event: existing,
53                payload_hash,
54                status_update,
55                wake_delivery,
56                occurred_at_ms,
57                replayed: true,
58            });
59        }
60        return Err(PluginError::Session(format!(
61            "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
62        )));
63    }
64    let declared = record
65        .event_types
66        .iter()
67        .find(|declared| declared.name == request.event_type)
68        .ok_or_else(|| {
69            PluginError::Session(format!(
70                "process `{process_id}` emitted undeclared event type `{}`",
71                request.event_type
72            ))
73        })?;
74    require_event_replay(process_id, &request, &declared.semantics)?;
75    declared
76        .payload_schema
77        .validate(&request.payload)
78        .map_err(|err| {
79            PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
80        })?;
81    let semantics = materialize_process_event_semantics(
82        process_id,
83        sequence,
84        &request.payload,
85        &declared.semantics,
86    )?;
87    if semantics.terminal.is_some() && record.is_terminal() {
88        return Err(PluginError::Session(format!(
89            "process `{process_id}` is already terminal"
90        )));
91    }
92    let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
93    let event = ProcessEvent {
94        process_id: process_id.to_string(),
95        sequence,
96        event_type: request.event_type,
97        payload: request.payload,
98        invocation: crate::runtime::causal::process_event_invocation(
99            &record.provenance.owner_scope.session_id,
100            process_id,
101            sequence,
102            declared.name.as_str(),
103            request.replay,
104        ),
105        semantics: semantics.clone(),
106        occurred_at,
107    };
108    let wake_delivery = prepare_wake_delivery(
109        process_id,
110        record,
111        event.sequence,
112        event.event_type.clone(),
113        event.invocation.clone(),
114        event.occurred_at,
115        semantics.wake.clone(),
116        request.wake_target_scope,
117    )?;
118    Ok(PreparedProcessEventAppend {
119        event,
120        payload_hash,
121        status_update: semantics.terminal.map(ProcessStatus::from_terminal),
122        wake_delivery,
123        occurred_at_ms,
124        replayed: false,
125    })
126}
127
128#[expect(
129    clippy::too_many_arguments,
130    reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
131)]
132fn prepare_wake_delivery(
133    process_id: &str,
134    record: &ProcessRecord,
135    sequence: u64,
136    event_type: String,
137    event_invocation: crate::RuntimeInvocation,
138    occurred_at: std::time::SystemTime,
139    wake: Option<super::events::ProcessWake>,
140    wake_target_scope: Option<super::model::ProcessScope>,
141) -> Result<Option<ProcessWakeDelivery>, PluginError> {
142    let Some(wake) = wake else {
143        return Ok(None);
144    };
145    let Some(target_scope) = wake_target_scope else {
146        return Err(PluginError::Session(format!(
147            "process `{process_id}` emitted wake event `{event_type}` without a wake target scope"
148        )));
149    };
150    process_wake_delivery(ProcessWakeDeliveryRequest {
151        target_scope,
152        process_id: process_id.to_string(),
153        sequence,
154        event_type,
155        event_invocation,
156        process_caused_by: record.provenance.caused_by.clone(),
157        wake,
158        occurred_at,
159    })
160    .map(Some)
161}
162
163pub fn prepare_process_registration(
164    mut registration: ProcessRegistration,
165) -> Result<(ProcessRegistration, String), PluginError> {
166    ensure_core_event_types(&mut registration);
167    validate_process_registration(&registration)?;
168    let registration_hash = process_registration_hash(&registration)?;
169    Ok((registration, registration_hash))
170}
171
172pub fn process_registration_hash(
173    registration: &ProcessRegistration,
174) -> Result<String, PluginError> {
175    crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
176        PluginError::Session(format!(
177            "failed to hash process `{}` registration: {err}",
178            registration.id
179        ))
180    })
181}
182
183pub fn process_event_payload_hash(
184    event_type: &str,
185    payload: &serde_json::Value,
186) -> Result<String, PluginError> {
187    crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
188        PluginError::Session(format!(
189            "failed to hash `{event_type}` process event: {err}"
190        ))
191    })
192}
193
194pub fn require_event_replay(
195    process_id: &str,
196    request: &ProcessEventAppendRequest,
197    spec: &ProcessEventSemanticsSpec,
198) -> Result<(), PluginError> {
199    let requires_key =
200        spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
201    if requires_key
202        && request
203            .replay
204            .as_ref()
205            .is_none_or(|replay| replay.key.is_empty())
206    {
207        return Err(PluginError::Session(format!(
208            "process `{process_id}` event `{}` requires a deterministic replay key",
209            request.event_type
210        )));
211    }
212    Ok(())
213}
214
215pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
216    let mut existing = registration
217        .event_types
218        .iter()
219        .map(|event_type| event_type.name.clone())
220        .collect::<HashSet<_>>();
221    for event_type in default_process_event_types() {
222        if existing.insert(event_type.name.clone()) {
223            registration.event_types.push(event_type);
224        }
225    }
226}
227
228pub(super) fn validate_process_registration(
229    registration: &ProcessRegistration,
230) -> Result<(), PluginError> {
231    if registration.id.trim().is_empty() {
232        return Err(PluginError::Session(
233            "process id must be a non-empty string".to_string(),
234        ));
235    }
236    if registration.provenance.owner_scope.is_empty() {
237        return Err(PluginError::Session(format!(
238            "process `{}` owner scope must include a session id",
239            registration.id
240        )));
241    }
242    if registration.provenance.host_profile_id.trim().is_empty() {
243        return Err(PluginError::Session(format!(
244            "process `{}` host profile id must be non-empty",
245            registration.id
246        )));
247    }
248    let mut names = HashSet::new();
249    for event_type in &registration.event_types {
250        if event_type.name.trim().is_empty() {
251            return Err(PluginError::Session(format!(
252                "process `{}` declares an empty event type",
253                registration.id
254            )));
255        }
256        if !names.insert(event_type.name.as_str()) {
257            return Err(PluginError::Session(format!(
258                "process `{}` declares duplicate event type `{}`",
259                registration.id, event_type.name
260            )));
261        }
262        if let Some(terminal) = &event_type.semantics.terminal
263            && terminal.state != ProcessTerminalState::Completed
264            && terminal.await_output.is_none()
265        {
266            return Err(PluginError::Session(format!(
267                "terminal event `{}` for process `{}` must declare await output",
268                event_type.name, registration.id
269            )));
270        }
271    }
272    Ok(())
273}