1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, ProcessScope, ProcessScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11 pub name: String,
12 pub payload_schema: crate::LashSchema,
13 pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub terminal: Option<ProcessTerminalSpec>,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
21 pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26 pub state: ProcessTerminalState,
27 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub when: Option<ProcessValueSelector>,
35 pub input: ProcessValueSelector,
36 #[serde(default)]
37 pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43 #[default]
44 EventIdentity,
45 Selector(ProcessValueSelector),
46 Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52 Payload,
53 Pointer(String),
54 Const(serde_json::Value),
55 Template {
56 template: String,
57 #[serde(default)]
58 fields: BTreeMap<String, ProcessValueSelector>,
59 },
60 Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub terminal: Option<ProcessTerminalSemantics>,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74 Completed,
75 Failed,
76 Cancelled,
77}
78
79#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
80pub struct ProcessTerminalSemantics {
81 pub state: ProcessTerminalState,
82 pub await_output: ProcessAwaitOutput,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum ProcessAwaitOutput {
88 Success {
89 value: serde_json::Value,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
91 control: Option<crate::ToolControl>,
92 },
93 Failure {
94 class: crate::ToolFailureClass,
95 code: String,
96 message: String,
97 #[serde(default, skip_serializing_if = "Option::is_none")]
98 raw: Option<serde_json::Value>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 control: Option<crate::ToolControl>,
101 },
102 Cancelled {
103 message: String,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 raw: Option<serde_json::Value>,
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 control: Option<crate::ToolControl>,
108 },
109}
110
111impl ProcessAwaitOutput {
112 pub fn terminal_state(&self) -> ProcessTerminalState {
113 match self {
114 Self::Success { .. } => ProcessTerminalState::Completed,
115 Self::Failure { .. } => ProcessTerminalState::Failed,
116 Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
117 }
118 }
119
120 pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
121 let control = output.control;
122 match output.outcome {
123 crate::ToolCallOutcome::Success(value) => Self::Success {
124 value: value.to_json_value(),
125 control,
126 },
127 crate::ToolCallOutcome::Failure(failure) => Self::Failure {
128 class: failure.class,
129 code: failure.code,
130 message: failure.message,
131 raw: failure.raw.map(|value| value.to_json_value()),
132 control,
133 },
134 crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
135 message: cancellation.message,
136 raw: cancellation.raw.map(|value| value.to_json_value()),
137 control,
138 },
139 }
140 }
141
142 pub fn into_tool_output(self) -> crate::ToolCallOutput {
143 match self {
144 Self::Success { value, control } => {
145 let mut output = crate::ToolCallOutput::success(value);
146 output.control = control;
147 output
148 }
149 Self::Failure {
150 class,
151 code,
152 message,
153 raw,
154 control,
155 } => {
156 let mut failure = crate::ToolFailure::tool(class, code, message);
157 failure.raw = raw.map(crate::ToolValue::from);
158 let mut output = crate::ToolCallOutput::failure(failure);
159 output.control = control;
160 output
161 }
162 Self::Cancelled {
163 message,
164 raw,
165 control,
166 } => {
167 let mut cancellation = crate::ToolCancellation::runtime(message);
168 cancellation.raw = raw.map(crate::ToolValue::from);
169 let mut output = crate::ToolCallOutput::cancelled(cancellation);
170 output.control = control;
171 output
172 }
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct ProcessWake {
179 pub input: String,
180 pub dedupe_key: String,
181}
182
183pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
184 vec![
185 ProcessEventType {
186 name: "process.yield".to_string(),
187 payload_schema: crate::LashSchema::any(),
188 semantics: ProcessEventSemanticsSpec::default(),
189 },
190 ProcessEventType {
191 name: "process.wake".to_string(),
192 payload_schema: crate::LashSchema::any(),
193 semantics: ProcessEventSemanticsSpec {
194 wake: Some(ProcessWakeSpec {
195 when: None,
196 input: ProcessValueSelector::Pointer("/text".to_string()),
197 dedupe_key: ProcessWakeDedupeKey::EventIdentity,
198 }),
199 ..ProcessEventSemanticsSpec::default()
200 },
201 },
202 ProcessEventType {
203 name: "process.signal".to_string(),
204 payload_schema: crate::LashSchema::any(),
205 semantics: ProcessEventSemanticsSpec::default(),
206 },
207 ]
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub struct ProcessEvent {
212 pub process_id: ProcessId,
213 pub sequence: u64,
214 pub event_type: String,
215 pub payload: serde_json::Value,
216 pub invocation: crate::RuntimeInvocation,
217 pub semantics: ProcessEventSemantics,
218 pub occurred_at: SystemTime,
219}
220
221#[derive(Clone, Debug, Serialize, Deserialize)]
222pub struct ProcessEventAppendResult {
223 pub event: ProcessEvent,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
225 pub wake_delivery: Option<ProcessWakeDelivery>,
226}
227
228#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
229pub struct ProcessEventAppendRequest {
230 pub event_type: String,
231 pub payload: serde_json::Value,
232 #[serde(default, skip_serializing_if = "Option::is_none")]
233 pub replay: Option<crate::RuntimeReplay>,
234 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub wake_target_scope: Option<ProcessScope>,
236}
237
238impl ProcessEventAppendRequest {
239 pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
240 Self {
241 event_type: event_type.into(),
242 payload,
243 replay: None,
244 wake_target_scope: None,
245 }
246 }
247
248 pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
249 self.replay = Some(crate::RuntimeReplay {
250 key: replay_key.into(),
251 });
252 self
253 }
254
255 pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
256 self.replay = replay;
257 self
258 }
259
260 pub fn with_wake_target_scope(mut self, scope: ProcessScope) -> Self {
261 self.wake_target_scope = Some(scope);
262 self
263 }
264
265 pub fn with_optional_wake_target_scope(mut self, scope: Option<ProcessScope>) -> Self {
266 self.wake_target_scope = scope;
267 self
268 }
269
270 pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
271 let payload = serde_json::json!({
272 "reason": reason,
273 });
274 let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
275 .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
276 Self::new("process.cancel_requested", payload).with_replay_key(format!(
277 "process:{process_id}:cancel_requested:{replay_key}"
278 ))
279 }
280}
281
282#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
283pub struct ProcessWakeDelivery {
284 pub wake_id: String,
285 pub target_session_id: String,
286 pub target_scope_id: ProcessScopeId,
287 pub process_id: ProcessId,
288 pub sequence: u64,
289 #[serde(default = "default_process_wake_event_type")]
290 pub event_type: String,
291 #[serde(default = "default_process_wake_event_invocation")]
292 pub event_invocation: crate::RuntimeInvocation,
293 #[serde(default, skip_serializing_if = "Option::is_none")]
294 pub process_caused_by: Option<crate::CausalRef>,
295 pub dedupe_key: String,
296 pub input: String,
297 pub created_at_ms: u64,
298}
299
300fn default_process_wake_event_type() -> String {
301 "process.wake".to_string()
302}
303
304fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
305 crate::RuntimeInvocation {
306 scope: crate::RuntimeScope::new(""),
307 subject: crate::RuntimeSubject::ProcessEvent {
308 process_id: String::new(),
309 sequence: 0,
310 event_type: default_process_wake_event_type(),
311 },
312 caused_by: None,
313 replay: None,
314 }
315}
316
317pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
318 vec![
319 ProcessEventType {
320 name: "process.cancel_requested".to_string(),
321 payload_schema: crate::LashSchema::any(),
322 semantics: ProcessEventSemanticsSpec::default(),
323 },
324 terminal_event_type("process.completed", ProcessTerminalState::Completed),
325 terminal_event_type("process.failed", ProcessTerminalState::Failed),
326 terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
327 ]
328}
329
330fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
331 ProcessEventType {
332 name: name.to_string(),
333 payload_schema: crate::LashSchema::any(),
334 semantics: ProcessEventSemanticsSpec {
335 terminal: Some(ProcessTerminalSpec {
336 state,
337 await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
338 }),
339 ..ProcessEventSemanticsSpec::default()
340 },
341 }
342}