lash_core/runtime/process/
validation.rs1use std::collections::HashSet;
2
3use crate::plugin::PluginError;
4
5use super::events::{
6 ProcessEvent, ProcessEventAppendRequest, ProcessEventSemanticsSpec, ProcessTerminalState,
7 ProcessWakeDelivery, default_process_event_types,
8};
9use super::materialization::materialize_process_event_semantics;
10use super::model::{ProcessRecord, ProcessRegistration, ProcessStatus};
11use super::time::{epoch_ms_from_system_time, system_time_from_epoch_ms};
12use super::wake::{ProcessWakeDeliveryRequest, process_wake_delivery};
13
14#[derive(Clone, Debug)]
15pub struct PreparedProcessEventAppend {
16 pub event: ProcessEvent,
17 pub payload_hash: String,
18 pub status_update: Option<ProcessStatus>,
19 pub wake_delivery: Option<ProcessWakeDelivery>,
20 pub occurred_at_ms: u64,
21 pub replayed: bool,
22}
23
24pub fn prepare_process_event_append(
25 record: &ProcessRecord,
26 request: ProcessEventAppendRequest,
27 sequence: u64,
28 replay_lookup: Option<(String, ProcessEvent)>,
29 occurred_at_ms: u64,
30) -> Result<PreparedProcessEventAppend, PluginError> {
31 let process_id = record.id.as_str();
32 let payload_hash = process_event_payload_hash(&request.event_type, &request.payload)?;
33 if let Some(replay_key) = request.replay.as_ref().map(|replay| replay.key.as_str())
34 && let Some((existing_hash, existing)) = replay_lookup
35 {
36 if existing_hash == payload_hash {
37 let status_update = existing.semantics.terminal.clone().and_then(|terminal| {
38 (!record.is_terminal()).then(|| ProcessStatus::from_terminal(terminal))
39 });
40 let occurred_at_ms = epoch_ms_from_system_time(existing.occurred_at);
41 let wake_delivery = prepare_wake_delivery(
42 process_id,
43 record,
44 existing.sequence,
45 existing.event_type.clone(),
46 existing.invocation.clone(),
47 existing.occurred_at,
48 existing.semantics.wake.clone(),
49 request
50 .wake_target_scope
51 .clone()
52 .or_else(|| record.wake_target.clone()),
53 )?;
54 return Ok(PreparedProcessEventAppend {
55 event: existing,
56 payload_hash,
57 status_update,
58 wake_delivery,
59 occurred_at_ms,
60 replayed: true,
61 });
62 }
63 return Err(PluginError::Session(format!(
64 "process `{process_id}` event replay key `{replay_key}` conflicts with an existing event"
65 )));
66 }
67 let declared = record
68 .event_types
69 .iter()
70 .find(|declared| declared.name == request.event_type)
71 .ok_or_else(|| {
72 PluginError::Session(format!(
73 "process `{process_id}` emitted undeclared event type `{}`",
74 request.event_type
75 ))
76 })?;
77 require_event_replay(process_id, &request, &declared.semantics)?;
78 declared
79 .payload_schema
80 .validate(&request.payload)
81 .map_err(|err| {
82 PluginError::Session(format!("invalid `{}` payload: {err}", request.event_type))
83 })?;
84 let semantics = materialize_process_event_semantics(
85 process_id,
86 sequence,
87 &request.payload,
88 &declared.semantics,
89 )?;
90 if semantics.terminal.is_some() && record.is_terminal() {
91 return Err(PluginError::Session(format!(
92 "process `{process_id}` is already terminal"
93 )));
94 }
95 let occurred_at = system_time_from_epoch_ms(occurred_at_ms);
96 let event = ProcessEvent {
97 process_id: process_id.to_string(),
98 sequence,
99 event_type: request.event_type,
100 payload: request.payload,
101 invocation: crate::runtime::causal::process_event_invocation(
102 process_id,
103 sequence,
104 declared.name.as_str(),
105 request.replay,
106 ),
107 semantics: semantics.clone(),
108 occurred_at,
109 };
110 let wake_delivery = prepare_wake_delivery(
111 process_id,
112 record,
113 event.sequence,
114 event.event_type.clone(),
115 event.invocation.clone(),
116 event.occurred_at,
117 semantics.wake.clone(),
118 request
119 .wake_target_scope
120 .or_else(|| record.wake_target.clone()),
121 )?;
122 Ok(PreparedProcessEventAppend {
123 event,
124 payload_hash,
125 status_update: semantics.terminal.map(ProcessStatus::from_terminal),
126 wake_delivery,
127 occurred_at_ms,
128 replayed: false,
129 })
130}
131
132#[expect(
133 clippy::too_many_arguments,
134 reason = "wake delivery mirrors the persisted event plus its optional materialized wake"
135)]
136fn prepare_wake_delivery(
137 process_id: &str,
138 record: &ProcessRecord,
139 sequence: u64,
140 event_type: String,
141 event_invocation: crate::RuntimeInvocation,
142 occurred_at: std::time::SystemTime,
143 wake: Option<super::events::ProcessWake>,
144 wake_target_scope: Option<super::model::SessionScope>,
145) -> Result<Option<ProcessWakeDelivery>, PluginError> {
146 let Some(wake) = wake else {
147 return Ok(None);
148 };
149 let Some(target_scope) = wake_target_scope else {
150 return Ok(None);
151 };
152 process_wake_delivery(ProcessWakeDeliveryRequest {
153 target_scope,
154 process_id: process_id.to_string(),
155 sequence,
156 event_type,
157 event_invocation,
158 process_caused_by: record.provenance.caused_by.clone(),
159 wake,
160 occurred_at,
161 })
162 .map(Some)
163}
164
165pub fn prepare_process_registration(
166 mut registration: ProcessRegistration,
167) -> Result<(ProcessRegistration, String), PluginError> {
168 ensure_core_event_types(&mut registration);
169 validate_process_registration(®istration)?;
170 let registration_hash = process_registration_hash(®istration)?;
171 Ok((registration, registration_hash))
172}
173
174pub fn process_registration_hash(
175 registration: &ProcessRegistration,
176) -> Result<String, PluginError> {
177 crate::stable_hash::stable_json_sha256_hex(registration).map_err(|err| {
178 PluginError::Session(format!(
179 "failed to hash process `{}` registration: {err}",
180 registration.id
181 ))
182 })
183}
184
185pub fn process_event_payload_hash(
186 event_type: &str,
187 payload: &serde_json::Value,
188) -> Result<String, PluginError> {
189 crate::stable_hash::stable_json_sha256_hex(&(event_type, payload)).map_err(|err| {
190 PluginError::Session(format!(
191 "failed to hash `{event_type}` process event: {err}"
192 ))
193 })
194}
195
196pub fn require_event_replay(
197 process_id: &str,
198 request: &ProcessEventAppendRequest,
199 spec: &ProcessEventSemanticsSpec,
200) -> Result<(), PluginError> {
201 let requires_key =
202 spec.terminal.is_some() || request.event_type.as_str() == "process.cancel_requested";
203 if requires_key
204 && request
205 .replay
206 .as_ref()
207 .is_none_or(|replay| replay.key.is_empty())
208 {
209 return Err(PluginError::Session(format!(
210 "process `{process_id}` event `{}` requires a deterministic replay key",
211 request.event_type
212 )));
213 }
214 Ok(())
215}
216
217pub(super) fn ensure_core_event_types(registration: &mut ProcessRegistration) {
218 let mut existing = registration
219 .event_types
220 .iter()
221 .map(|event_type| event_type.name.clone())
222 .collect::<HashSet<_>>();
223 for event_type in default_process_event_types() {
224 if existing.insert(event_type.name.clone()) {
225 registration.event_types.push(event_type);
226 }
227 }
228}
229
230pub(super) fn validate_process_registration(
231 registration: &ProcessRegistration,
232) -> Result<(), PluginError> {
233 if registration.id.trim().is_empty() {
234 return Err(PluginError::Session(
235 "process id must be a non-empty string".to_string(),
236 ));
237 }
238 if registration.provenance.host_profile_id.trim().is_empty() {
239 return Err(PluginError::Session(format!(
240 "process `{}` host profile id must be non-empty",
241 registration.id
242 )));
243 }
244 match registration.input.as_ref() {
245 super::model::ProcessInput::ToolCall { .. }
246 | super::model::ProcessInput::LashlangProcess { .. } => {
247 if registration.env_ref.is_none() {
248 return Err(PluginError::Session(format!(
249 "process `{}` requires a captured execution env",
250 registration.id
251 )));
252 }
253 }
254 super::model::ProcessInput::External { .. }
255 | super::model::ProcessInput::SessionTurn { .. } => {
256 if registration.env_ref.is_some() {
257 return Err(PluginError::Session(format!(
258 "process `{}` must not capture an execution env for this input kind",
259 registration.id
260 )));
261 }
262 }
263 }
264 let mut names = HashSet::new();
265 for event_type in ®istration.event_types {
266 if event_type.name.trim().is_empty() {
267 return Err(PluginError::Session(format!(
268 "process `{}` declares an empty event type",
269 registration.id
270 )));
271 }
272 if !names.insert(event_type.name.as_str()) {
273 return Err(PluginError::Session(format!(
274 "process `{}` declares duplicate event type `{}`",
275 registration.id, event_type.name
276 )));
277 }
278 if let Some(terminal) = &event_type.semantics.terminal
279 && terminal.state != ProcessTerminalState::Completed
280 && terminal.await_output.is_none()
281 {
282 return Err(PluginError::Session(format!(
283 "terminal event `{}` for process `{}` must declare await output",
284 event_type.name, registration.id
285 )));
286 }
287 }
288 Ok(())
289}