Skip to main content

lash_core/runtime/process/
events.rs

1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, ProcessScope, ProcessScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11    pub name: String,
12    pub payload_schema: crate::LashSchema,
13    pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub terminal: Option<ProcessTerminalSpec>,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26    pub state: ProcessTerminalState,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub when: Option<ProcessValueSelector>,
35    pub input: ProcessValueSelector,
36    #[serde(default)]
37    pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43    #[default]
44    EventIdentity,
45    Selector(ProcessValueSelector),
46    Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52    Payload,
53    Pointer(String),
54    Const(serde_json::Value),
55    Template {
56        template: String,
57        #[serde(default)]
58        fields: BTreeMap<String, ProcessValueSelector>,
59    },
60    Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub terminal: Option<ProcessTerminalSemantics>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74    Completed,
75    Failed,
76    Cancelled,
77}
78
79#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
80pub struct ProcessTerminalSemantics {
81    pub state: ProcessTerminalState,
82    pub await_output: ProcessAwaitOutput,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum ProcessAwaitOutput {
88    Success {
89        value: serde_json::Value,
90        #[serde(default, skip_serializing_if = "Option::is_none")]
91        control: Option<crate::ToolControl>,
92    },
93    Failure {
94        class: crate::ToolFailureClass,
95        code: String,
96        message: String,
97        #[serde(default, skip_serializing_if = "Option::is_none")]
98        raw: Option<serde_json::Value>,
99        #[serde(default, skip_serializing_if = "Option::is_none")]
100        control: Option<crate::ToolControl>,
101    },
102    Cancelled {
103        message: String,
104        #[serde(default, skip_serializing_if = "Option::is_none")]
105        raw: Option<serde_json::Value>,
106        #[serde(default, skip_serializing_if = "Option::is_none")]
107        control: Option<crate::ToolControl>,
108    },
109}
110
111impl ProcessAwaitOutput {
112    pub fn terminal_state(&self) -> ProcessTerminalState {
113        match self {
114            Self::Success { .. } => ProcessTerminalState::Completed,
115            Self::Failure { .. } => ProcessTerminalState::Failed,
116            Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
117        }
118    }
119
120    pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
121        let control = output.control;
122        match output.outcome {
123            crate::ToolCallOutcome::Success(value) => Self::Success {
124                value: value.to_json_value(),
125                control,
126            },
127            crate::ToolCallOutcome::Failure(failure) => Self::Failure {
128                class: failure.class,
129                code: failure.code,
130                message: failure.message,
131                raw: failure.raw.map(|value| value.to_json_value()),
132                control,
133            },
134            crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
135                message: cancellation.message,
136                raw: cancellation.raw.map(|value| value.to_json_value()),
137                control,
138            },
139        }
140    }
141
142    pub fn into_tool_output(self) -> crate::ToolCallOutput {
143        match self {
144            Self::Success { value, control } => {
145                let mut output = crate::ToolCallOutput::success(value);
146                output.control = control;
147                output
148            }
149            Self::Failure {
150                class,
151                code,
152                message,
153                raw,
154                control,
155            } => {
156                let mut failure = crate::ToolFailure::tool(class, code, message);
157                failure.raw = raw.map(crate::ToolValue::from);
158                let mut output = crate::ToolCallOutput::failure(failure);
159                output.control = control;
160                output
161            }
162            Self::Cancelled {
163                message,
164                raw,
165                control,
166            } => {
167                let mut cancellation = crate::ToolCancellation::runtime(message);
168                cancellation.raw = raw.map(crate::ToolValue::from);
169                let mut output = crate::ToolCallOutput::cancelled(cancellation);
170                output.control = control;
171                output
172            }
173        }
174    }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct ProcessWake {
179    pub input: String,
180    pub dedupe_key: String,
181}
182
183pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
184    vec![
185        ProcessEventType {
186            name: "process.yield".to_string(),
187            payload_schema: crate::LashSchema::any(),
188            semantics: ProcessEventSemanticsSpec::default(),
189        },
190        ProcessEventType {
191            name: "process.wake".to_string(),
192            payload_schema: crate::LashSchema::any(),
193            semantics: ProcessEventSemanticsSpec {
194                wake: Some(ProcessWakeSpec {
195                    when: None,
196                    input: ProcessValueSelector::Pointer("/text".to_string()),
197                    dedupe_key: ProcessWakeDedupeKey::EventIdentity,
198                }),
199                ..ProcessEventSemanticsSpec::default()
200            },
201        },
202        ProcessEventType {
203            name: "process.signal".to_string(),
204            payload_schema: crate::LashSchema::any(),
205            semantics: ProcessEventSemanticsSpec::default(),
206        },
207    ]
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub struct ProcessEvent {
212    pub process_id: ProcessId,
213    pub sequence: u64,
214    pub event_type: String,
215    pub payload: serde_json::Value,
216    pub invocation: crate::RuntimeInvocation,
217    pub semantics: ProcessEventSemantics,
218    pub occurred_at: SystemTime,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize)]
222pub struct ProcessEventAppendResult {
223    pub event: ProcessEvent,
224    #[serde(default, skip_serializing_if = "Option::is_none")]
225    pub wake_delivery: Option<ProcessWakeDelivery>,
226}
227
228#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
229pub struct ProcessEventAppendRequest {
230    pub event_type: String,
231    pub payload: serde_json::Value,
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub replay: Option<crate::RuntimeReplay>,
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub wake_target_scope: Option<ProcessScope>,
236}
237
238impl ProcessEventAppendRequest {
239    pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
240        Self {
241            event_type: event_type.into(),
242            payload,
243            replay: None,
244            wake_target_scope: None,
245        }
246    }
247
248    pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
249        self.replay = Some(crate::RuntimeReplay {
250            key: replay_key.into(),
251        });
252        self
253    }
254
255    pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
256        self.replay = replay;
257        self
258    }
259
260    pub fn with_wake_target_scope(mut self, scope: ProcessScope) -> Self {
261        self.wake_target_scope = Some(scope);
262        self
263    }
264
265    pub fn with_optional_wake_target_scope(mut self, scope: Option<ProcessScope>) -> Self {
266        self.wake_target_scope = scope;
267        self
268    }
269
270    pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
271        let payload = serde_json::json!({
272            "reason": reason,
273        });
274        let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
275            .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
276        Self::new("process.cancel_requested", payload).with_replay_key(format!(
277            "process:{process_id}:cancel_requested:{replay_key}"
278        ))
279    }
280}
281
282#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
283pub struct ProcessWakeDelivery {
284    pub wake_id: String,
285    pub target_session_id: String,
286    pub target_scope_id: ProcessScopeId,
287    pub process_id: ProcessId,
288    pub sequence: u64,
289    #[serde(default = "default_process_wake_event_type")]
290    pub event_type: String,
291    #[serde(default = "default_process_wake_event_invocation")]
292    pub event_invocation: crate::RuntimeInvocation,
293    #[serde(default, skip_serializing_if = "Option::is_none")]
294    pub process_caused_by: Option<crate::CausalRef>,
295    pub dedupe_key: String,
296    pub input: String,
297    pub created_at_ms: u64,
298}
299
300fn default_process_wake_event_type() -> String {
301    "process.wake".to_string()
302}
303
304fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
305    crate::RuntimeInvocation {
306        scope: crate::RuntimeScope::new(""),
307        subject: crate::RuntimeSubject::ProcessEvent {
308            process_id: String::new(),
309            sequence: 0,
310            event_type: default_process_wake_event_type(),
311        },
312        caused_by: None,
313        replay: None,
314    }
315}
316
317pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
318    vec![
319        ProcessEventType {
320            name: "process.cancel_requested".to_string(),
321            payload_schema: crate::LashSchema::any(),
322            semantics: ProcessEventSemanticsSpec::default(),
323        },
324        terminal_event_type("process.completed", ProcessTerminalState::Completed),
325        terminal_event_type("process.failed", ProcessTerminalState::Failed),
326        terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
327    ]
328}
329
330fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
331    ProcessEventType {
332        name: name.to_string(),
333        payload_schema: crate::LashSchema::any(),
334        semantics: ProcessEventSemanticsSpec {
335            terminal: Some(ProcessTerminalSpec {
336                state,
337                await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
338            }),
339            ..ProcessEventSemanticsSpec::default()
340        },
341    }
342}