lash_core/runtime/process/
validation.rs1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6 ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7 ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16 pub event: ProcessEvent,
17 pub payload_hash: String,
18 pub status_update: Option<ProcessStatus>,
19 pub wake_delivery: Option<ProcessWakeDelivery>,
20 pub occurred_at_ms: u64,
21 pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25 record: &ProcessRecord,
26 request: ProcessEventAppendRequest,
27 sequence: u64,
28 replay_lookup: Option<(String, ProcessEvent)>,
29 occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31 let process_id = record.id.as_str();
32 let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33 if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34 && let Some((existing_hash, existing)) = replay_lookup
35 {
36 if existing_hash == payload_hash {
37 let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
38 (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
39 });
40 let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
41 let wake_delivery = prepare_wake_delivery(
42 process_id,
43 record,
44 existing.sequence,
45 existing.event_type.clone(),
46 existing.invocation.clone(),
47 existing.occurred_at,
48 existing.semantics.wake.clone(),
49 request.wake_target_scope.clone(),
50 )?;
51 return Ok(PreparedProcessEventAppend {
52 event: existing,
53 payload_hash,
54 status_update,
55 wake_delivery,
56 occurred_at_ms,
57 replayed: true,
58 });
59 }
60 return Err(PluginError::Session(format!(
61 "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
62 )));
63 }
64 let declared = record
65 .event_types
66 .iter()
67 .find(|declared| declared.name == request.event_type)
68 .ok_or_else(|| {
69 PluginError::Session(format!(
70 "process `{process_id}` emitted undeclared event type `{}`",
71 request.event_type
72 ))
73 })?;
74 require_event_replay(process_id, &request, &declared.semantics)?;
75 declared
76 .payload_schema
77 .validate(&request.payload)
78 .map_err(|err| {
79 PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
80 })?;
81 let semantics = materialize_process_event_semantics(
82 process_id,
83 sequence,
84 &request.payload,
85 &declared.semantics,
86 )?;
87 if semantics.terminal.is_some() && record.is_terminal() {
88 return Err(PluginError::Session(format!(
89 "process `{process_id}` is already terminal"
90 )));
91 }
92 let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
93 let event = ProcessEvent {
94 process_id: process_id.to_string(),
95 sequence,
96 event_type: request.event_type,
97 payload: request.payload,
98 invocation: crate::runtime::causal::process_event_invocation(
99 &record.provenance.owner_scope.session_id,
100 process_id,
101 sequence,
102 declared.name.as_str(),
103 request.replay,
104 ),
105 semantics: semantics.clone(),
106 occurred_at,
107 };
108 let wake_delivery = prepare_wake_delivery(
109 process_id,
110 record,
111 event.sequence,
112 event.event_type.clone(),
113 event.invocation.clone(),
114 event.occurred_at,
115 semantics.wake.clone(),
116 request.wake_target_scope,
117 )?;
118 Ok(PreparedProcessEventAppend {
119 event,
120 payload_hash,
121 status_update: semantics.terminal.map(ProcessStatus::from_terminal),
122 wake_delivery,
123 occurred_at_ms,
124 replayed: false,
125 })
126}
127
128#[expect(
129 clippy::too_many_arguments,
130 reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
131)]
132fn prepare_wake_delivery(
133 process_id: &str,
134 record: &ProcessRecord,
135 sequence: u64,
136 event_type: String,
137 event_invocation: crate::RuntimeInvocation,
138 occurred_at: std::time::SystemTime,
139 wake: Option<super::events::ProcessWake>,
140 wake_target_scope: Option<super::model::ProcessScope>,
141) -> Result<Option<ProcessWakeDelivery>, PluginError> {
142 let Some(wake) = wake else {
143 return Ok(None);
144 };
145 let Some(target_scope) = wake_target_scope else {
146 return Err(PluginError::Session(format!(
147 "process `{process_id}` emitted wake event `{event_type}` without a wake target scope"
148 )));
149 };
150 process_wake_delivery(ProcessWakeDeliveryRequest {
151 target_scope,
152 process_id: process_id.to_string(),
153 sequence,
154 event_type,
155 event_invocation,
156 process_caused_by: record.provenance.caused_by.clone(),
157 wake,
158 occurred_at,
159 })
160 .map(Some)
161}
162
163pub fn prepare_process_registration(
164 mut registration: ProcessRegistration,
165) -> Result<(ProcessRegistration, String), PluginError> {
166 ensure_core_event_types(&mut registration);
167 validate_process_registration(®istration)?;
168 let registration_hash = process_registration_hash(®istration)?;
169 Ok((registration, registration_hash))
170}
171
172pub fn process_registration_hash(
173 registration: &ProcessRegistration,
174) -> Result<String, PluginError> {
175 crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
176 PluginError::Session(format!(
177 "failed to hash process `{}` registration: {err}",
178 registration.id
179 ))
180 })
181}
182
183pub fn process_event_payload_hash(
184 event_type: &str,
185 payload: &serde_json::Value,
186) -> Result<String, PluginError> {
187 crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
188 PluginError::Session(format!(
189 "failed to hash `{event_type}` process event: {err}"
190 ))
191 })
192}
193
194pub fn require_event_replay(
195 process_id: &str,
196 request: &ProcessEventAppendRequest,
197 spec: &ProcessEventSemanticsSpec,
198) -> Result<(), PluginError> {
199 let requires_key =
200 spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
201 if requires_key
202 && request
203 .replay
204 .as_ref()
205 .is_none_or(|replay| replay.key.is_empty())
206 {
207 return Err(PluginError::Session(format!(
208 "process `{process_id}` event `{}` requires a deterministic replay key",
209 request.event_type
210 )));
211 }
212 Ok(())
213}
214
215pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
216 let mut existing = registration
217 .event_types
218 .iter()
219 .map(|event_type| event_type.name.clone())
220 .collect::<HashSet<_>>();
221 for event_type in default_process_event_types() {
222 if existing.insert(event_type.name.clone()) {
223 registration.event_types.push(event_type);
224 }
225 }
226}
227
228pub(super) fn validate_process_registration(
229 registration: &ProcessRegistration,
230) -> Result<(), PluginError> {
231 if registration.id.trim().is_empty() {
232 return Err(PluginError::Session(
233 "process id must be a non-empty string".to_string(),
234 ));
235 }
236 if registration.provenance.owner_scope.is_empty() {
237 return Err(PluginError::Session(format!(
238 "process `{}` owner scope must include a session id",
239 registration.id
240 )));
241 }
242 if registration.provenance.host_profile_id.trim().is_empty() {
243 return Err(PluginError::Session(format!(
244 "process `{}` host profile id must be non-empty",
245 registration.id
246 )));
247 }
248 let mut names = HashSet::new();
249 for event_type in ®istration.event_types {
250 if event_type.name.trim().is_empty() {
251 return Err(PluginError::Session(format!(
252 "process `{}` declares an empty event type",
253 registration.id
254 )));
255 }
256 if !names.insert(event_type.name.as_str()) {
257 return Err(PluginError::Session(format!(
258 "process `{}` declares duplicate event type `{}`",
259 registration.id, event_type.name
260 )));
261 }
262 if let Some(terminal) = &event_type.semantics.terminal
263 && terminal.state != ProcessTerminalState::Completed
264 && terminal.await_output.is_none()
265 {
266 return Err(PluginError::Session(format!(
267 "terminal event `{}` for process `{}` must declare await output",
268 event_type.name, registration.id
269 )));
270 }
271 }
272 Ok(())
273}