Skip to main content

lash_core/runtime/process/
events.rs

1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, SessionScope, SessionScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11    pub name: String,
12    pub payload_schema: crate::LashSchema,
13    pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18    #[serde(default, skip_serializing_if = "Option::is_none")]
19    pub terminal: Option<ProcessTerminalSpec>,
20    #[serde(default, skip_serializing_if = "Option::is_none")]
21    pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26    pub state: ProcessTerminalState,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33    #[serde(default, skip_serializing_if = "Option::is_none")]
34    pub when: Option<ProcessValueSelector>,
35    pub input: ProcessValueSelector,
36    #[serde(default)]
37    pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43    #[default]
44    EventIdentity,
45    Selector(ProcessValueSelector),
46    Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52    Payload,
53    Pointer(String),
54    Const(serde_json::Value),
55    Template {
56        template: String,
57        #[serde(default)]
58        fields: BTreeMap<String, ProcessValueSelector>,
59    },
60    Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub terminal: Option<ProcessTerminalSemantics>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74    Completed,
75    Failed,
76    Cancelled,
77}
78
79#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
80pub struct ProcessTerminalSemantics {
81    pub state: ProcessTerminalState,
82    pub await_output: ProcessAwaitOutput,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum ProcessAwaitOutput {
88    Success {
89        value: serde_json::Value,
90        #[serde(default, skip_serializing_if = "Option::is_none")]
91        control: Option<crate::ToolControl>,
92    },
93    Failure {
94        class: crate::ToolFailureClass,
95        code: String,
96        message: String,
97        #[serde(default, skip_serializing_if = "Option::is_none")]
98        raw: Option<serde_json::Value>,
99        #[serde(default, skip_serializing_if = "Option::is_none")]
100        control: Option<crate::ToolControl>,
101    },
102    Cancelled {
103        message: String,
104        #[serde(default, skip_serializing_if = "Option::is_none")]
105        raw: Option<serde_json::Value>,
106        #[serde(default, skip_serializing_if = "Option::is_none")]
107        control: Option<crate::ToolControl>,
108    },
109}
110
111impl ProcessAwaitOutput {
112    pub fn terminal_state(&self) -> ProcessTerminalState {
113        match self {
114            Self::Success { .. } => ProcessTerminalState::Completed,
115            Self::Failure { .. } => ProcessTerminalState::Failed,
116            Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
117        }
118    }
119
120    pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
121        let control = output.control;
122        match output.outcome {
123            crate::ToolCallOutcome::Success(value) => Self::Success {
124                value: value.to_json_value(),
125                control,
126            },
127            crate::ToolCallOutcome::Failure(failure) => Self::Failure {
128                class: failure.class,
129                code: failure.code,
130                message: failure.message,
131                raw: failure.raw.map(|value| value.to_json_value()),
132                control,
133            },
134            crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
135                message: cancellation.message,
136                raw: cancellation.raw.map(|value| value.to_json_value()),
137                control,
138            },
139        }
140    }
141
142    pub fn into_tool_output(self) -> crate::ToolCallOutput {
143        match self {
144            Self::Success { value, control } => {
145                let mut output = crate::ToolCallOutput::success(value);
146                output.control = control;
147                output
148            }
149            Self::Failure {
150                class,
151                code,
152                message,
153                raw,
154                control,
155            } => {
156                let mut failure = crate::ToolFailure::tool(class, code, message);
157                failure.raw = raw.map(crate::ToolValue::from);
158                let mut output = crate::ToolCallOutput::failure(failure);
159                output.control = control;
160                output
161            }
162            Self::Cancelled {
163                message,
164                raw,
165                control,
166            } => {
167                let mut cancellation = crate::ToolCancellation::runtime(message);
168                cancellation.raw = raw.map(crate::ToolValue::from);
169                let mut output = crate::ToolCallOutput::cancelled(cancellation);
170                output.control = control;
171                output
172            }
173        }
174    }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct ProcessWake {
179    pub input: String,
180    pub dedupe_key: String,
181}
182
183pub fn process_signal_event_type(signal_name: &str) -> Result<String, crate::PluginError> {
184    validate_process_signal_name(signal_name)?;
185    Ok(format!("signal.{signal_name}"))
186}
187
188pub fn process_signal_name_from_event_type(event_type: &str) -> Option<&str> {
189    event_type.strip_prefix("signal.")
190}
191
192pub fn process_signal_wait_key(process_id: &str, signal_name: &str, ordinal: u64) -> String {
193    format!("process:{process_id}:signal.{signal_name}:{ordinal}")
194}
195
196pub fn validate_process_signal_name(signal_name: &str) -> Result<(), crate::PluginError> {
197    let valid = !signal_name.is_empty()
198        && signal_name
199            .chars()
200            .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
201    if valid {
202        Ok(())
203    } else {
204        Err(crate::PluginError::Session(format!(
205            "process signal name must be non-empty and contain only ASCII letters, digits, `_`, or `-`, got `{signal_name}`"
206        )))
207    }
208}
209
210pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
211    vec![
212        ProcessEventType {
213            name: "process.yield".to_string(),
214            payload_schema: crate::LashSchema::any(),
215            semantics: ProcessEventSemanticsSpec::default(),
216        },
217        ProcessEventType {
218            name: "process.wake".to_string(),
219            payload_schema: crate::LashSchema::any(),
220            semantics: ProcessEventSemanticsSpec {
221                wake: Some(ProcessWakeSpec {
222                    when: None,
223                    input: ProcessValueSelector::Pointer("/text".to_string()),
224                    dedupe_key: ProcessWakeDedupeKey::EventIdentity,
225                }),
226                ..ProcessEventSemanticsSpec::default()
227            },
228        },
229    ]
230}
231
232pub fn lashlang_process_signal_event_types(
233    process: &lashlang::ProcessDecl,
234) -> Vec<ProcessEventType> {
235    process
236        .signals
237        .iter()
238        .map(|signal| ProcessEventType {
239            name: process_signal_event_type(signal.name.as_str())
240                .expect("lashlang process signal declarations use parser-validated names"),
241            payload_schema: crate::LashSchema::new(lashlang_type_expr_schema(&signal.ty)),
242            semantics: ProcessEventSemanticsSpec::default(),
243        })
244        .collect()
245}
246
247fn lashlang_type_expr_schema(ty: &lashlang::TypeExpr) -> serde_json::Value {
248    match ty {
249        lashlang::TypeExpr::Any
250        | lashlang::TypeExpr::Dict
251        | lashlang::TypeExpr::Ref(_)
252        | lashlang::TypeExpr::Process { .. }
253        | lashlang::TypeExpr::TriggerHandle(_) => serde_json::json!({}),
254        lashlang::TypeExpr::Str => serde_json::json!({ "type": "string" }),
255        lashlang::TypeExpr::Int => serde_json::json!({ "type": "integer" }),
256        lashlang::TypeExpr::Float => serde_json::json!({ "type": "number" }),
257        lashlang::TypeExpr::Bool => serde_json::json!({ "type": "boolean" }),
258        lashlang::TypeExpr::Null => serde_json::json!({ "type": "null" }),
259        lashlang::TypeExpr::Enum(values) => serde_json::json!({
260            "enum": values.iter().map(|value| value.as_str()).collect::<Vec<_>>()
261        }),
262        lashlang::TypeExpr::List(item) => serde_json::json!({
263            "type": "array",
264            "items": lashlang_type_expr_schema(item),
265        }),
266        lashlang::TypeExpr::Object(fields) => {
267            let mut properties = serde_json::Map::new();
268            let mut required = Vec::new();
269            for field in fields {
270                properties.insert(field.name.to_string(), lashlang_type_expr_schema(&field.ty));
271                if !field.optional {
272                    required.push(serde_json::Value::String(field.name.to_string()));
273                }
274            }
275            let mut schema = serde_json::Map::new();
276            schema.insert(
277                "type".to_string(),
278                serde_json::Value::String("object".to_string()),
279            );
280            schema.insert(
281                "properties".to_string(),
282                serde_json::Value::Object(properties),
283            );
284            if !required.is_empty() {
285                schema.insert("required".to_string(), serde_json::Value::Array(required));
286            }
287            schema.insert(
288                "additionalProperties".to_string(),
289                serde_json::Value::Bool(true),
290            );
291            serde_json::Value::Object(schema)
292        }
293        lashlang::TypeExpr::Union(variants) => serde_json::json!({
294            "anyOf": variants.iter().map(lashlang_type_expr_schema).collect::<Vec<_>>()
295        }),
296    }
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct ProcessEvent {
301    pub process_id: ProcessId,
302    pub sequence: u64,
303    pub event_type: String,
304    pub payload: serde_json::Value,
305    pub invocation: crate::RuntimeInvocation,
306    pub semantics: ProcessEventSemantics,
307    pub occurred_at: SystemTime,
308}
309
310#[derive(Clone, Debug, Serialize, Deserialize)]
311pub struct ProcessEventAppendResult {
312    pub event: ProcessEvent,
313    #[serde(default, skip_serializing_if = "Option::is_none")]
314    pub wake_delivery: Option<ProcessWakeDelivery>,
315}
316
317#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
318pub struct ProcessEventAppendRequest {
319    pub event_type: String,
320    pub payload: serde_json::Value,
321    #[serde(default, skip_serializing_if = "Option::is_none")]
322    pub replay: Option<crate::RuntimeReplay>,
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub wake_target_scope: Option<SessionScope>,
325}
326
327impl ProcessEventAppendRequest {
328    pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
329        Self {
330            event_type: event_type.into(),
331            payload,
332            replay: None,
333            wake_target_scope: None,
334        }
335    }
336
337    pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
338        self.replay = Some(crate::RuntimeReplay {
339            key: replay_key.into(),
340        });
341        self
342    }
343
344    pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
345        self.replay = replay;
346        self
347    }
348
349    pub fn with_wake_target_scope(mut self, scope: SessionScope) -> Self {
350        self.wake_target_scope = Some(scope);
351        self
352    }
353
354    pub fn with_optional_wake_target_scope(mut self, scope: Option<SessionScope>) -> Self {
355        self.wake_target_scope = scope;
356        self
357    }
358
359    pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
360        let payload = serde_json::json!({
361            "reason": reason,
362        });
363        let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
364            .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
365        Self::new("process.cancel_requested", payload).with_replay_key(format!(
366            "process:{process_id}:cancel_requested:{replay_key}"
367        ))
368    }
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372pub struct ProcessWakeDelivery {
373    pub wake_id: String,
374    pub target_session_id: String,
375    pub target_scope_id: SessionScopeId,
376    pub process_id: ProcessId,
377    pub sequence: u64,
378    #[serde(default = "default_process_wake_event_type")]
379    pub event_type: String,
380    #[serde(default = "default_process_wake_event_invocation")]
381    pub event_invocation: crate::RuntimeInvocation,
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    pub process_caused_by: Option<crate::CausalRef>,
384    pub dedupe_key: String,
385    pub input: String,
386    pub created_at_ms: u64,
387}
388
389fn default_process_wake_event_type() -> String {
390    "process.wake".to_string()
391}
392
393fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
394    crate::RuntimeInvocation {
395        scope: crate::RuntimeScope::new(""),
396        subject: crate::RuntimeSubject::ProcessEvent {
397            process_id: String::new(),
398            sequence: 0,
399            event_type: default_process_wake_event_type(),
400        },
401        caused_by: None,
402        replay: None,
403    }
404}
405
406pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
407    vec![
408        ProcessEventType {
409            name: "process.cancel_requested".to_string(),
410            payload_schema: crate::LashSchema::any(),
411            semantics: ProcessEventSemanticsSpec::default(),
412        },
413        ProcessEventType {
414            name: "process.waiting".to_string(),
415            payload_schema: crate::LashSchema::any(),
416            semantics: ProcessEventSemanticsSpec::default(),
417        },
418        ProcessEventType {
419            name: "process.resumed".to_string(),
420            payload_schema: crate::LashSchema::any(),
421            semantics: ProcessEventSemanticsSpec::default(),
422        },
423        terminal_event_type("process.completed", ProcessTerminalState::Completed),
424        terminal_event_type("process.failed", ProcessTerminalState::Failed),
425        terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
426    ]
427}
428
429fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
430    ProcessEventType {
431        name: name.to_string(),
432        payload_schema: crate::LashSchema::any(),
433        semantics: ProcessEventSemanticsSpec {
434            terminal: Some(ProcessTerminalSpec {
435                state,
436                await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
437            }),
438            ..ProcessEventSemanticsSpec::default()
439        },
440    }
441}