1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, SessionScope, SessionScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11 pub name: String,
12 pub payload_schema: crate::LashSchema,
13 pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub terminal: Option<ProcessTerminalSpec>,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
21 pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26 pub state: ProcessTerminalState,
27 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub when: Option<ProcessValueSelector>,
35 pub input: ProcessValueSelector,
36 #[serde(default)]
37 pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43 #[default]
44 EventIdentity,
45 Selector(ProcessValueSelector),
46 Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52 Payload,
53 Pointer(String),
54 Const(serde_json::Value),
55 Template {
56 template: String,
57 #[serde(default)]
58 fields: BTreeMap<String, ProcessValueSelector>,
59 },
60 Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub terminal: Option<ProcessTerminalSemantics>,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74 Completed,
75 Failed,
76 Cancelled,
77 Abandoned,
81}
82
83#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum AbandonWriter {
88 OwnerDrain,
91 Sweep,
94 ReconciledRequest,
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
103pub struct AbandonEvidence {
104 pub writer: AbandonWriter,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub owner: Option<crate::LeaseOwnerIdentity>,
107 pub epoch_ms: u64,
108}
109
110#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
111pub struct ProcessTerminalSemantics {
112 pub state: ProcessTerminalState,
113 pub await_output: ProcessAwaitOutput,
114}
115
116#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
117#[serde(tag = "type", rename_all = "snake_case")]
118pub enum ProcessAwaitOutput {
119 Success {
120 value: serde_json::Value,
121 #[serde(default, skip_serializing_if = "Option::is_none")]
122 control: Option<crate::ToolControl>,
123 },
124 Failure {
125 class: crate::ToolFailureClass,
126 code: String,
127 message: String,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 raw: Option<serde_json::Value>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 control: Option<crate::ToolControl>,
132 },
133 Cancelled {
134 message: String,
135 #[serde(default, skip_serializing_if = "Option::is_none")]
136 raw: Option<serde_json::Value>,
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 control: Option<crate::ToolControl>,
139 },
140 Abandoned {
146 evidence: Box<AbandonEvidence>,
147 #[serde(default, skip_serializing_if = "Option::is_none")]
148 control: Option<crate::ToolControl>,
149 },
150}
151
152impl ProcessAwaitOutput {
153 pub fn terminal_state(&self) -> ProcessTerminalState {
154 match self {
155 Self::Success { .. } => ProcessTerminalState::Completed,
156 Self::Failure { .. } => ProcessTerminalState::Failed,
157 Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
158 Self::Abandoned { .. } => ProcessTerminalState::Abandoned,
159 }
160 }
161
162 pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
163 let control = output.control;
164 match output.outcome {
165 crate::ToolCallOutcome::Success(value) => Self::Success {
166 value: value.to_json_value(),
167 control,
168 },
169 crate::ToolCallOutcome::Failure(failure) => Self::Failure {
170 class: failure.class,
171 code: failure.code,
172 message: failure.message,
173 raw: failure.raw.map(|value| value.to_json_value()),
174 control,
175 },
176 crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
177 message: cancellation.message,
178 raw: cancellation.raw.map(|value| value.to_json_value()),
179 control,
180 },
181 }
182 }
183
184 pub fn into_tool_output(self) -> crate::ToolCallOutput {
185 match self {
186 Self::Success { value, control } => {
187 let mut output = crate::ToolCallOutput::success(value);
188 output.control = control;
189 output
190 }
191 Self::Failure {
192 class,
193 code,
194 message,
195 raw,
196 control,
197 } => {
198 let mut failure = crate::ToolFailure::tool(class, code, message);
199 failure.raw = raw.map(crate::ToolValue::from);
200 let mut output = crate::ToolCallOutput::failure(failure);
201 output.control = control;
202 output
203 }
204 Self::Cancelled {
205 message,
206 raw,
207 control,
208 } => {
209 let mut cancellation = crate::ToolCancellation::runtime(message);
210 cancellation.raw = raw.map(crate::ToolValue::from);
211 let mut output = crate::ToolCallOutput::cancelled(cancellation);
212 output.control = control;
213 output
214 }
215 Self::Abandoned { evidence, control } => {
221 let raw = serde_json::to_value(&evidence)
222 .ok()
223 .map(crate::ToolValue::from);
224 let message = match evidence.writer {
225 AbandonWriter::OwnerDrain => {
226 "process abandoned: owner drained without recording an outcome".to_string()
227 }
228 AbandonWriter::Sweep => {
229 "process abandoned: recovery observed the owner provably dead".to_string()
230 }
231 AbandonWriter::ReconciledRequest => {
232 "process abandoned: reconciled abandon request after the lease lapsed"
233 .to_string()
234 }
235 };
236 let mut failure = crate::ToolFailure::tool(
237 crate::ToolFailureClass::External,
238 "process_abandoned",
239 message,
240 );
241 failure.raw = raw;
242 let mut output = crate::ToolCallOutput::failure(failure);
243 output.control = control;
244 output
245 }
246 }
247 }
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct ProcessWake {
252 pub input: String,
253 pub dedupe_key: String,
254}
255
256pub fn process_signal_event_type(signal_name: &str) -> Result<String, crate::PluginError> {
257 validate_process_signal_name(signal_name)?;
258 Ok(format!("signal.{signal_name}"))
259}
260
261pub fn process_signal_name_from_event_type(event_type: &str) -> Option<&str> {
262 event_type.strip_prefix("signal.")
263}
264
265pub fn process_signal_wait_key(process_id: &str, signal_name: &str, ordinal: u64) -> String {
266 format!("process:{process_id}:signal.{signal_name}:{ordinal}")
267}
268
269pub fn validate_process_signal_name(signal_name: &str) -> Result<(), crate::PluginError> {
270 let valid = !signal_name.is_empty()
271 && signal_name
272 .chars()
273 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
274 if valid {
275 Ok(())
276 } else {
277 Err(crate::PluginError::Session(format!(
278 "process signal name must be non-empty and contain only ASCII letters, digits, `_`, or `-`, got `{signal_name}`"
279 )))
280 }
281}
282
283#[derive(Clone, Debug, Serialize, Deserialize)]
284pub struct ProcessEvent {
285 pub process_id: ProcessId,
286 pub sequence: u64,
287 pub event_type: String,
288 pub payload: serde_json::Value,
289 pub invocation: crate::RuntimeInvocation,
290 pub semantics: ProcessEventSemantics,
291 pub occurred_at: SystemTime,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct ProcessEventAppendResult {
296 pub event: ProcessEvent,
297 #[serde(default, skip_serializing_if = "Option::is_none")]
298 pub wake_delivery: Option<ProcessWakeDelivery>,
299}
300
301#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
302pub struct ProcessEventAppendRequest {
303 pub event_type: String,
304 pub payload: serde_json::Value,
305 #[serde(default, skip_serializing_if = "Option::is_none")]
306 pub replay: Option<crate::RuntimeReplay>,
307 #[serde(default, skip_serializing_if = "Option::is_none")]
308 pub wake_target_scope: Option<SessionScope>,
309}
310
311impl ProcessEventAppendRequest {
312 pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
313 Self {
314 event_type: event_type.into(),
315 payload,
316 replay: None,
317 wake_target_scope: None,
318 }
319 }
320
321 pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
322 self.replay = Some(crate::RuntimeReplay {
323 key: replay_key.into(),
324 });
325 self
326 }
327
328 pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
329 self.replay = replay;
330 self
331 }
332
333 pub fn with_wake_target_scope(mut self, scope: SessionScope) -> Self {
334 self.wake_target_scope = Some(scope);
335 self
336 }
337
338 pub fn with_optional_wake_target_scope(mut self, scope: Option<SessionScope>) -> Self {
339 self.wake_target_scope = scope;
340 self
341 }
342
343 pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
344 let payload = serde_json::json!({
345 "reason": reason,
346 });
347 let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
348 .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
349 Self::new("process.cancel_requested", payload).with_replay_key(format!(
350 "process:{process_id}:cancel_requested:{replay_key}"
351 ))
352 }
353}
354
355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356pub struct ProcessWakeDelivery {
357 pub wake_id: String,
358 pub target_session_id: String,
359 pub target_scope_id: SessionScopeId,
360 pub process_id: ProcessId,
361 pub sequence: u64,
362 #[serde(default = "default_process_wake_event_type")]
363 pub event_type: String,
364 #[serde(default = "default_process_wake_event_invocation")]
365 pub event_invocation: crate::RuntimeInvocation,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
367 pub process_caused_by: Option<crate::CausalRef>,
368 pub dedupe_key: String,
369 pub input: String,
370 pub created_at_ms: u64,
371}
372
373fn default_process_wake_event_type() -> String {
374 "process.wake".to_string()
375}
376
377fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
378 crate::RuntimeInvocation {
379 scope: crate::RuntimeScope::new(""),
380 subject: crate::RuntimeSubject::ProcessEvent {
381 process_id: String::new(),
382 sequence: 0,
383 event_type: default_process_wake_event_type(),
384 },
385 caused_by: None,
386 replay: None,
387 }
388}
389
390pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
391 vec![
392 ProcessEventType {
393 name: "process.cancel_requested".to_string(),
394 payload_schema: crate::LashSchema::any(),
395 semantics: ProcessEventSemanticsSpec::default(),
396 },
397 ProcessEventType {
398 name: "process.waiting".to_string(),
399 payload_schema: crate::LashSchema::any(),
400 semantics: ProcessEventSemanticsSpec::default(),
401 },
402 ProcessEventType {
403 name: "process.resumed".to_string(),
404 payload_schema: crate::LashSchema::any(),
405 semantics: ProcessEventSemanticsSpec::default(),
406 },
407 terminal_event_type("process.completed", ProcessTerminalState::Completed),
408 terminal_event_type("process.failed", ProcessTerminalState::Failed),
409 terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
410 terminal_event_type("process.abandoned", ProcessTerminalState::Abandoned),
411 ]
412}
413
414fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
415 ProcessEventType {
416 name: name.to_string(),
417 payload_schema: crate::LashSchema::any(),
418 semantics: ProcessEventSemanticsSpec {
419 terminal: Some(ProcessTerminalSpec {
420 state,
421 await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
422 }),
423 ..ProcessEventSemanticsSpec::default()
424 },
425 }
426}