Skip to main content

lash_core/runtime/process/
events.rs

1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, SessionScope, SessionScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11    pub name: String,
12    pub payload_schema: crate::LashSchema,
13    pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub terminal: Option<ProcessTerminalSpec>,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26    pub state: ProcessTerminalState,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub when: Option<ProcessValueSelector>,
35    pub input: ProcessValueSelector,
36    #[serde(default)]
37    pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43    #[default]
44    EventIdentity,
45    Selector(ProcessValueSelector),
46    Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52    Payload,
53    Pointer(String),
54    Const(serde_json::Value),
55    Template {
56        template: String,
57        #[serde(default)]
58        fields: BTreeMap<String, ProcessValueSelector>,
59    },
60    Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub terminal: Option<ProcessTerminalSemantics>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74    Completed,
75    Failed,
76    Cancelled,
77    /// The owner stopped executing the work without recording an outcome. The
78    /// true result is unknowable and no cleanup is assumed to have run. Peer of
79    /// the other three terminals; see ADR 0019.
80    Abandoned,
81}
82
83/// Who wrote an [`ProcessTerminalState::Abandoned`] terminal — the exactly-one
84/// legitimate writer per path (ADR 0019).
85#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
86#[serde(rename_all = "snake_case")]
87pub enum AbandonWriter {
88    /// The owner abandoned its own OwnerBound work inline at graceful drain,
89    /// under its own live lease.
90    OwnerDrain,
91    /// The recovery sweep abandoned an OwnerBound, started row whose holder is
92    /// provably dead.
93    Sweep,
94    /// The sweep reconciled a durable Abandon Request into Abandoned once the
95    /// row's lease had lapsed.
96    ReconciledRequest,
97}
98
99/// Evidence attached to an [`ProcessTerminalState::Abandoned`] terminal: which
100/// path wrote it, the dead-or-lapsed owner identity it was established against
101/// (absent for an externally-owned row lash never executed), and when.
102#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
103pub struct AbandonEvidence {
104    pub writer: AbandonWriter,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub owner: Option<crate::LeaseOwnerIdentity>,
107    pub epoch_ms: u64,
108}
109
110#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
111pub struct ProcessTerminalSemantics {
112    pub state: ProcessTerminalState,
113    pub await_output: ProcessAwaitOutput,
114}
115
116#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
117#[serde(tag = "type", rename_all = "snake_case")]
118pub enum ProcessAwaitOutput {
119    Success {
120        value: serde_json::Value,
121        #[serde(default, skip_serializing_if = "Option::is_none")]
122        control: Option<crate::ToolControl>,
123    },
124    Failure {
125        class: crate::ToolFailureClass,
126        code: String,
127        message: String,
128        #[serde(default, skip_serializing_if = "Option::is_none")]
129        raw: Option<serde_json::Value>,
130        #[serde(default, skip_serializing_if = "Option::is_none")]
131        control: Option<crate::ToolControl>,
132    },
133    Cancelled {
134        message: String,
135        #[serde(default, skip_serializing_if = "Option::is_none")]
136        raw: Option<serde_json::Value>,
137        #[serde(default, skip_serializing_if = "Option::is_none")]
138        control: Option<crate::ToolControl>,
139    },
140    /// The owner stopped executing without recording an outcome. Written only by
141    /// the sweep or an owner's graceful drain, never round-tripped from a tool
142    /// (a tool cannot self-report abandonment); see [`AbandonEvidence`]. The
143    /// evidence is boxed so this rare terminal does not enlarge the pervasive
144    /// `ProcessAwaitOutput` that flows through every tool result.
145    Abandoned {
146        evidence: Box<AbandonEvidence>,
147        #[serde(default, skip_serializing_if = "Option::is_none")]
148        control: Option<crate::ToolControl>,
149    },
150}
151
152impl ProcessAwaitOutput {
153    pub fn terminal_state(&self) -> ProcessTerminalState {
154        match self {
155            Self::Success { .. } => ProcessTerminalState::Completed,
156            Self::Failure { .. } => ProcessTerminalState::Failed,
157            Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
158            Self::Abandoned { .. } => ProcessTerminalState::Abandoned,
159        }
160    }
161
162    pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
163        let control = output.control;
164        match output.outcome {
165            crate::ToolCallOutcome::Success(value) => Self::Success {
166                value: value.to_json_value(),
167                control,
168            },
169            crate::ToolCallOutcome::Failure(failure) => Self::Failure {
170                class: failure.class,
171                code: failure.code,
172                message: failure.message,
173                raw: failure.raw.map(|value| value.to_json_value()),
174                control,
175            },
176            crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
177                message: cancellation.message,
178                raw: cancellation.raw.map(|value| value.to_json_value()),
179                control,
180            },
181        }
182    }
183
184    pub fn into_tool_output(self) -> crate::ToolCallOutput {
185        match self {
186            Self::Success { value, control } => {
187                let mut output = crate::ToolCallOutput::success(value);
188                output.control = control;
189                output
190            }
191            Self::Failure {
192                class,
193                code,
194                message,
195                raw,
196                control,
197            } => {
198                let mut failure = crate::ToolFailure::tool(class, code, message);
199                failure.raw = raw.map(crate::ToolValue::from);
200                let mut output = crate::ToolCallOutput::failure(failure);
201                output.control = control;
202                output
203            }
204            Self::Cancelled {
205                message,
206                raw,
207                control,
208            } => {
209                let mut cancellation = crate::ToolCancellation::runtime(message);
210                cancellation.raw = raw.map(crate::ToolValue::from);
211                let mut output = crate::ToolCallOutput::cancelled(cancellation);
212                output.control = control;
213                output
214            }
215            // Abandonment has no `ToolCallOutcome` peer: a tool never self-reports
216            // it. To a caller awaiting the result it surfaces one-directionally as
217            // an external failure whose raw payload names it abandoned and carries
218            // the evidence, while the process layer keeps `Abandoned` a distinct
219            // terminal (ADR 0019). `from_tool_output` therefore never reverses this.
220            Self::Abandoned { evidence, control } => {
221                let raw = serde_json::to_value(&evidence)
222                    .ok()
223                    .map(crate::ToolValue::from);
224                let message = match evidence.writer {
225                    AbandonWriter::OwnerDrain => {
226                        "process abandoned: owner drained without recording an outcome".to_string()
227                    }
228                    AbandonWriter::Sweep => {
229                        "process abandoned: recovery observed the owner provably dead".to_string()
230                    }
231                    AbandonWriter::ReconciledRequest => {
232                        "process abandoned: reconciled abandon request after the lease lapsed"
233                            .to_string()
234                    }
235                };
236                let mut failure = crate::ToolFailure::tool(
237                    crate::ToolFailureClass::External,
238                    "process_abandoned",
239                    message,
240                );
241                failure.raw = raw;
242                let mut output = crate::ToolCallOutput::failure(failure);
243                output.control = control;
244                output
245            }
246        }
247    }
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct ProcessWake {
252    pub input: String,
253    pub dedupe_key: String,
254}
255
256pub fn process_signal_event_type(signal_name: &str) -> Result<String, crate::PluginError> {
257    validate_process_signal_name(signal_name)?;
258    Ok(format!("signal.{signal_name}"))
259}
260
261pub fn process_signal_name_from_event_type(event_type: &str) -> Option<&str> {
262    event_type.strip_prefix("signal.")
263}
264
265pub fn process_signal_wait_key(process_id: &str, signal_name: &str, ordinal: u64) -> String {
266    format!("process:{process_id}:signal.{signal_name}:{ordinal}")
267}
268
269pub fn validate_process_signal_name(signal_name: &str) -> Result<(), crate::PluginError> {
270    let valid = !signal_name.is_empty()
271        && signal_name
272            .chars()
273            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
274    if valid {
275        Ok(())
276    } else {
277        Err(crate::PluginError::Session(format!(
278            "process signal name must be non-empty and contain only ASCII letters, digits, `_`, or `-`, got `{signal_name}`"
279        )))
280    }
281}
282
283#[derive(Clone, Debug, Serialize, Deserialize)]
284pub struct ProcessEvent {
285    pub process_id: ProcessId,
286    pub sequence: u64,
287    pub event_type: String,
288    pub payload: serde_json::Value,
289    pub invocation: crate::RuntimeInvocation,
290    pub semantics: ProcessEventSemantics,
291    pub occurred_at: SystemTime,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct ProcessEventAppendResult {
296    pub event: ProcessEvent,
297    #[serde(default, skip_serializing_if = "Option::is_none")]
298    pub wake_delivery: Option<ProcessWakeDelivery>,
299}
300
301#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
302pub struct ProcessEventAppendRequest {
303    pub event_type: String,
304    pub payload: serde_json::Value,
305    #[serde(default, skip_serializing_if = "Option::is_none")]
306    pub replay: Option<crate::RuntimeReplay>,
307    #[serde(default, skip_serializing_if = "Option::is_none")]
308    pub wake_target_scope: Option<SessionScope>,
309}
310
311impl ProcessEventAppendRequest {
312    pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
313        Self {
314            event_type: event_type.into(),
315            payload,
316            replay: None,
317            wake_target_scope: None,
318        }
319    }
320
321    pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
322        self.replay = Some(crate::RuntimeReplay {
323            key: replay_key.into(),
324        });
325        self
326    }
327
328    pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
329        self.replay = replay;
330        self
331    }
332
333    pub fn with_wake_target_scope(mut self, scope: SessionScope) -> Self {
334        self.wake_target_scope = Some(scope);
335        self
336    }
337
338    pub fn with_optional_wake_target_scope(mut self, scope: Option<SessionScope>) -> Self {
339        self.wake_target_scope = scope;
340        self
341    }
342
343    pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
344        let payload = serde_json::json!({
345            "reason": reason,
346        });
347        let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
348            .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
349        Self::new("process.cancel_requested", payload).with_replay_key(format!(
350            "process:{process_id}:cancel_requested:{replay_key}"
351        ))
352    }
353}
354
355#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
356pub struct ProcessWakeDelivery {
357    pub wake_id: String,
358    pub target_session_id: String,
359    pub target_scope_id: SessionScopeId,
360    pub process_id: ProcessId,
361    pub sequence: u64,
362    #[serde(default = "default_process_wake_event_type")]
363    pub event_type: String,
364    #[serde(default = "default_process_wake_event_invocation")]
365    pub event_invocation: crate::RuntimeInvocation,
366    #[serde(default, skip_serializing_if = "Option::is_none")]
367    pub process_caused_by: Option<crate::CausalRef>,
368    pub dedupe_key: String,
369    pub input: String,
370    pub created_at_ms: u64,
371}
372
373fn default_process_wake_event_type() -> String {
374    "process.wake".to_string()
375}
376
377fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
378    crate::RuntimeInvocation {
379        scope: crate::RuntimeScope::new(""),
380        subject: crate::RuntimeSubject::ProcessEvent {
381            process_id: String::new(),
382            sequence: 0,
383            event_type: default_process_wake_event_type(),
384        },
385        caused_by: None,
386        replay: None,
387    }
388}
389
390pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
391    vec![
392        ProcessEventType {
393            name: "process.cancel_requested".to_string(),
394            payload_schema: crate::LashSchema::any(),
395            semantics: ProcessEventSemanticsSpec::default(),
396        },
397        ProcessEventType {
398            name: "process.waiting".to_string(),
399            payload_schema: crate::LashSchema::any(),
400            semantics: ProcessEventSemanticsSpec::default(),
401        },
402        ProcessEventType {
403            name: "process.resumed".to_string(),
404            payload_schema: crate::LashSchema::any(),
405            semantics: ProcessEventSemanticsSpec::default(),
406        },
407        terminal_event_type("process.completed", ProcessTerminalState::Completed),
408        terminal_event_type("process.failed", ProcessTerminalState::Failed),
409        terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
410        terminal_event_type("process.abandoned", ProcessTerminalState::Abandoned),
411    ]
412}
413
414fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
415    ProcessEventType {
416        name: name.to_string(),
417        payload_schema: crate::LashSchema::any(),
418        semantics: ProcessEventSemanticsSpec {
419            terminal: Some(ProcessTerminalSpec {
420                state,
421                await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
422            }),
423            ..ProcessEventSemanticsSpec::default()
424        },
425    }
426}