lash_core/runtime/process/
validation.rs1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6 ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7 ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::system_time_from_epoch_ms;
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16 pub event: ProcessEvent,
17 pub payload_hash: String,
18 pub status_update: Option<ProcessStatus>,
19 pub wake_delivery: Option<ProcessWakeDelivery>,
20 pub occurred_at_ms: u64,
21 pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25 record: &ProcessRecord,
26 request: ProcessEventAppendRequest,
27 sequence: u64,
28 replay_lookup: Option<(String, ProcessEvent)>,
29 occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31 let process_id = record.id.as_str();
32 let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33 if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34 && let Some((existing_hash, existing)) = replay_lookup
35 {
36 if existing_hash == payload_hash {
37 let wake_delivery = prepare_wake_delivery(
38 process_id,
39 record,
40 existing.sequence,
41 existing.event_type.clone(),
42 existing.invocation.clone(),
43 existing.occurred_at,
44 existing.semantics.wake.clone(),
45 request.wake_target_scope.clone(),
46 )?;
47 return Ok(PreparedProcessEventAppend {
48 event: existing,
49 payload_hash,
50 status_update: None,
51 wake_delivery,
52 occurred_at_ms,
53 replayed: true,
54 });
55 }
56 return Err(PluginError::Session(format!(
57 "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
58 )));
59 }
60 let declared = record
61 .event_types
62 .iter()
63 .find(|declared| declared.name == request.event_type)
64 .ok_or_else(|| {
65 PluginError::Session(format!(
66 "process `{process_id}` emitted undeclared event type `{}`",
67 request.event_type
68 ))
69 })?;
70 require_event_replay(process_id, &request, &declared.semantics)?;
71 declared
72 .payload_schema
73 .validate(&request.payload)
74 .map_err(|err| {
75 PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
76 })?;
77 let semantics = materialize_process_event_semantics(
78 process_id,
79 sequence,
80 &request.payload,
81 &declared.semantics,
82 )?;
83 if semantics.terminal.is_some() && record.is_terminal() {
84 return Err(PluginError::Session(format!(
85 "process `{process_id}` is already terminal"
86 )));
87 }
88 let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
89 let event = ProcessEvent {
90 process_id: process_id.to_string(),
91 sequence,
92 event_type: request.event_type,
93 payload: request.payload,
94 invocation: crate::runtime::causal::process_event_invocation(
95 &record.provenance.owner_scope.session_id,
96 process_id,
97 sequence,
98 declared.name.as_str(),
99 request.replay,
100 ),
101 semantics: semantics.clone(),
102 occurred_at,
103 };
104 let wake_delivery = prepare_wake_delivery(
105 process_id,
106 record,
107 event.sequence,
108 event.event_type.clone(),
109 event.invocation.clone(),
110 event.occurred_at,
111 semantics.wake.clone(),
112 request.wake_target_scope,
113 )?;
114 Ok(PreparedProcessEventAppend {
115 event,
116 payload_hash,
117 status_update: semantics.terminal.map(ProcessStatus::from_terminal),
118 wake_delivery,
119 occurred_at_ms,
120 replayed: false,
121 })
122}
123
124#[expect(
125 clippy::too_many_arguments,
126 reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
127)]
128fn prepare_wake_delivery(
129 process_id: &str,
130 record: &ProcessRecord,
131 sequence: u64,
132 event_type: String,
133 event_invocation: crate::RuntimeInvocation,
134 occurred_at: std::time::SystemTime,
135 wake: Option<super::events::ProcessWake>,
136 wake_target_scope: Option<super::model::ProcessScope>,
137) -> Result<Option<ProcessWakeDelivery>, PluginError> {
138 let Some(wake) = wake else {
139 return Ok(None);
140 };
141 let Some(target_scope) = wake_target_scope else {
142 return Err(PluginError::Session(format!(
143 "process `{process_id}` emitted wake event `{event_type}` without a wake target scope"
144 )));
145 };
146 process_wake_delivery(ProcessWakeDeliveryRequest {
147 target_scope,
148 process_id: process_id.to_string(),
149 sequence,
150 event_type,
151 event_invocation,
152 process_caused_by: record.provenance.caused_by.clone(),
153 wake,
154 occurred_at,
155 })
156 .map(Some)
157}
158
159pub fn prepare_process_registration(
160 mut registration: ProcessRegistration,
161) -> Result<(ProcessRegistration, String), PluginError> {
162 ensure_core_event_types(&mut registration);
163 validate_process_registration(®istration)?;
164 let registration_hash = process_registration_hash(®istration)?;
165 Ok((registration, registration_hash))
166}
167
168pub fn process_registration_hash(
169 registration: &ProcessRegistration,
170) -> Result<String, PluginError> {
171 crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
172 PluginError::Session(format!(
173 "failed to hash process `{}` registration: {err}",
174 registration.id
175 ))
176 })
177}
178
179pub fn process_event_payload_hash(
180 event_type: &str,
181 payload: &serde_json::Value,
182) -> Result<String, PluginError> {
183 crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
184 PluginError::Session(format!(
185 "failed to hash `{event_type}` process event: {err}"
186 ))
187 })
188}
189
190pub fn require_event_replay(
191 process_id: &str,
192 request: &ProcessEventAppendRequest,
193 spec: &ProcessEventSemanticsSpec,
194) -> Result<(), PluginError> {
195 let requires_key =
196 spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
197 if requires_key
198 && request
199 .replay
200 .as_ref()
201 .is_none_or(|replay| replay.key.is_empty())
202 {
203 return Err(PluginError::Session(format!(
204 "process `{process_id}` event `{}` requires a deterministic replay key",
205 request.event_type
206 )));
207 }
208 Ok(())
209}
210
211pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
212 let mut existing = registration
213 .event_types
214 .iter()
215 .map(|event_type| event_type.name.clone())
216 .collect::<HashSet<_>>();
217 for event_type in default_process_event_types() {
218 if existing.insert(event_type.name.clone()) {
219 registration.event_types.push(event_type);
220 }
221 }
222}
223
224pub(super) fn validate_process_registration(
225 registration: &ProcessRegistration,
226) -> Result<(), PluginError> {
227 if registration.id.trim().is_empty() {
228 return Err(PluginError::Session(
229 "process id must be a non-empty string".to_string(),
230 ));
231 }
232 if registration.provenance.owner_scope.is_empty() {
233 return Err(PluginError::Session(format!(
234 "process `{}` owner scope must include a session id",
235 registration.id
236 )));
237 }
238 if registration.provenance.host_profile_id.trim().is_empty() {
239 return Err(PluginError::Session(format!(
240 "process `{}` host profile id must be non-empty",
241 registration.id
242 )));
243 }
244 let mut names = HashSet::new();
245 for event_type in ®istration.event_types {
246 if event_type.name.trim().is_empty() {
247 return Err(PluginError::Session(format!(
248 "process `{}` declares an empty event type",
249 registration.id
250 )));
251 }
252 if !names.insert(event_type.name.as_str()) {
253 return Err(PluginError::Session(format!(
254 "process `{}` declares duplicate event type `{}`",
255 registration.id, event_type.name
256 )));
257 }
258 if let Some(terminal) = &event_type.semantics.terminal
259 && terminal.state != ProcessTerminalState::Completed
260 && terminal.await_output.is_none()
261 {
262 return Err(PluginError::Session(format!(
263 "terminal event `{}` for process `{}` must declare await output",
264 event_type.name, registration.id
265 )));
266 }
267 }
268 Ok(())
269}