lash_core/runtime/process/
validation.rs1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6 ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7 ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub enum ProcessEventAppendPlan {
16 Insert {
17 event: ProcessEvent,
18 payload_hash: String,
19 status_update: Option<ProcessStatus>,
20 wake_delivery: Option<ProcessWakeDelivery>,
21 occurred_at_ms: u64,
22 },
23 Replay {
24 event: ProcessEvent,
25 repair_status: Option<ProcessStatus>,
26 wake_delivery: Option<ProcessWakeDelivery>,
27 occurred_at_ms: u64,
28 },
29}
30
31pub fn apply_process_status_projection(
32 record: &mut ProcessRecord,
33 status: ProcessStatus,
34 updated_at_ms: u64,
35) {
36 record.status = status;
37 if record.status.is_terminal() {
38 record.wait = None;
39 }
40 record.updated_at_ms = updated_at_ms;
41}
42
43pub fn prepare_process_event_append(
44 record: &ProcessRecord,
45 request: ProcessEventAppendRequest,
46 sequence: u64,
47 replay_lookup: Option<(String, ProcessEvent)>,
48 occurred_at_ms: u64,
49) -> Result<ProcessEventAppendPlan, PluginError> {
50 let process_id = record.id.as_str();
51 let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
52 if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
53 && let Some((existing_hash, existing)) = replay_lookup
54 {
55 if existing_hash == payload_hash {
56 let repair_status = existing.semantics.terminal.clone().and_then(|terminal| {
57 (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
58 });
59 let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
60 let wake_delivery = prepare_wake_delivery(
61 process_id,
62 record,
63 existing.sequence,
64 existing.event_type.clone(),
65 existing.invocation.clone(),
66 existing.occurred_at,
67 existing.semantics.wake.clone(),
68 request
69 .wake_target_scope
70 .clone()
71 .or_else(|| record.wake_target.clone()),
72 )?;
73 return Ok(ProcessEventAppendPlan::Replay {
74 event: existing,
75 repair_status,
76 wake_delivery,
77 occurred_at_ms,
78 });
79 }
80 return Err(PluginError::Session(format!(
81 "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
82 )));
83 }
84 let declared = record
85 .event_types
86 .iter()
87 .find(|declared| declared.name == request.event_type)
88 .ok_or_else(|| {
89 PluginError::Session(format!(
90 "process `{process_id}` emitted undeclared event type `{}`",
91 request.event_type
92 ))
93 })?;
94 require_event_replay(process_id, &request, &declared.semantics)?;
95 declared
96 .payload_schema
97 .validate(&request.payload)
98 .map_err(|err| {
99 PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
100 })?;
101 let semantics = materialize_process_event_semantics(
102 process_id,
103 sequence,
104 &request.payload,
105 &declared.semantics,
106 )?;
107 if semantics.terminal.is_some() && record.is_terminal() {
108 return Err(PluginError::Session(format!(
109 "process `{process_id}` is already terminal"
110 )));
111 }
112 let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
113 let event = ProcessEvent {
114 process_id: process_id.to_string(),
115 sequence,
116 event_type: request.event_type,
117 payload: request.payload,
118 invocation: crate::runtime::causal::process_event_invocation(
119 process_id,
120 sequence,
121 declared.name.as_str(),
122 request.replay,
123 ),
124 semantics: semantics.clone(),
125 occurred_at,
126 };
127 let wake_delivery = prepare_wake_delivery(
128 process_id,
129 record,
130 event.sequence,
131 event.event_type.clone(),
132 event.invocation.clone(),
133 event.occurred_at,
134 semantics.wake.clone(),
135 request
136 .wake_target_scope
137 .or_else(|| record.wake_target.clone()),
138 )?;
139 Ok(ProcessEventAppendPlan::Insert {
140 event,
141 payload_hash,
142 status_update: semantics.terminal.map(ProcessStatus::from_terminal),
143 wake_delivery,
144 occurred_at_ms,
145 })
146}
147
148#[expect(
149 clippy::too_many_arguments,
150 reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
151)]
152fn prepare_wake_delivery(
153 process_id: &str,
154 record: &ProcessRecord,
155 sequence: u64,
156 event_type: String,
157 event_invocation: crate::RuntimeInvocation,
158 occurred_at: std::time::SystemTime,
159 wake: Option<super::events::ProcessWake>,
160 wake_target_scope: Option<super::model::SessionScope>,
161) -> Result<Option<ProcessWakeDelivery>, PluginError> {
162 let Some(wake) = wake else {
163 return Ok(None);
164 };
165 let Some(target_scope) = wake_target_scope else {
166 return Ok(None);
167 };
168 process_wake_delivery(ProcessWakeDeliveryRequest {
169 target_scope,
170 process_id: process_id.to_string(),
171 sequence,
172 event_type,
173 event_invocation,
174 process_caused_by: record.provenance.caused_by.clone(),
175 wake,
176 occurred_at,
177 })
178 .map(Some)
179}
180
181pub fn prepare_process_registration(
182 mut registration: ProcessRegistration,
183) -> Result<(ProcessRegistration, String), PluginError> {
184 ensure_core_event_types(&mut registration);
185 validate_process_registration(®istration)?;
186 let registration_hash = process_registration_hash(®istration)?;
187 Ok((registration, registration_hash))
188}
189
190pub fn process_registration_hash(
191 registration: &ProcessRegistration,
192) -> Result<String, PluginError> {
193 crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
194 PluginError::Session(format!(
195 "failed to hash process `{}` registration: {err}",
196 registration.id
197 ))
198 })
199}
200
201pub fn process_event_payload_hash(
202 event_type: &str,
203 payload: &serde_json::Value,
204) -> Result<String, PluginError> {
205 crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
206 PluginError::Session(format!(
207 "failed to hash `{event_type}` process event: {err}"
208 ))
209 })
210}
211
212pub fn require_event_replay(
213 process_id: &str,
214 request: &ProcessEventAppendRequest,
215 spec: &ProcessEventSemanticsSpec,
216) -> Result<(), PluginError> {
217 let requires_key =
218 spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
219 if requires_key
220 && request
221 .replay
222 .as_ref()
223 .is_none_or(|replay| replay.key.is_empty())
224 {
225 return Err(PluginError::Session(format!(
226 "process `{process_id}` event `{}` requires a deterministic replay key",
227 request.event_type
228 )));
229 }
230 Ok(())
231}
232
233pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
234 let mut existing = registration
235 .event_types
236 .iter()
237 .map(|event_type| event_type.name.clone())
238 .collect::<HashSet<_>>();
239 for event_type in default_process_event_types() {
240 if existing.insert(event_type.name.clone()) {
241 registration.event_types.push(event_type);
242 }
243 }
244}
245
246pub(super) fn validate_process_registration(
247 registration: &ProcessRegistration,
248) -> Result<(), PluginError> {
249 if registration.id.trim().is_empty() {
250 return Err(PluginError::Session(
251 "process id must be a non-empty string".to_string(),
252 ));
253 }
254 match registration.input.as_ref() {
255 super::model::ProcessInput::ToolCall { .. } | super::model::ProcessInput::Engine { .. } => {
256 if registration.env_ref.is_none() {
257 return Err(PluginError::Session(format!(
258 "process `{}` requires a captured execution env",
259 registration.id
260 )));
261 }
262 }
263 super::model::ProcessInput::External { .. }
264 | super::model::ProcessInput::SessionTurn { .. } => {
265 if registration.env_ref.is_some() {
266 return Err(PluginError::Session(format!(
267 "process `{}` must not capture an execution env for this input kind",
268 registration.id
269 )));
270 }
271 }
272 }
273 let mut names = HashSet::new();
274 for event_type in ®istration.event_types {
275 if event_type.name.trim().is_empty() {
276 return Err(PluginError::Session(format!(
277 "process `{}` declares an empty event type",
278 registration.id
279 )));
280 }
281 if !names.insert(event_type.name.as_str()) {
282 return Err(PluginError::Session(format!(
283 "process `{}` declares duplicate event type `{}`",
284 registration.id, event_type.name
285 )));
286 }
287 if let Some(terminal) = &event_type.semantics.terminal
288 && terminal.state != ProcessTerminalState::Completed
289 && terminal.await_output.is_none()
290 {
291 return Err(PluginError::Session(format!(
292 "terminal event `{}` for process `{}` must declare await output",
293 event_type.name, registration.id
294 )));
295 }
296 }
297 Ok(())
298}