1use std::collections::BTreeMap;
2use std::time::SystemTime;
3
4use serde::{Deserialize, Serialize};
5
6use super::model::{ProcessId, SessionScope, SessionScopeId};
7use super::validation::process_event_payload_hash;
8
9#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
10pub struct ProcessEventType {
11 pub name: String,
12 pub payload_schema: crate::LashSchema,
13 pub semantics: ProcessEventSemanticsSpec,
14}
15
16#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
17pub struct ProcessEventSemanticsSpec {
18 #[serde(default, skip_serializing_if = "Option::is_none")]
19 pub terminal: Option<ProcessTerminalSpec>,
20 #[serde(default, skip_serializing_if = "Option::is_none")]
21 pub wake: Option<ProcessWakeSpec>,
22}
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct ProcessTerminalSpec {
26 pub state: ProcessTerminalState,
27 #[serde(default, skip_serializing_if = "Option::is_none")]
28 pub await_output: Option<ProcessValueSelector>,
29}
30
31#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
32pub struct ProcessWakeSpec {
33 #[serde(default, skip_serializing_if = "Option::is_none")]
34 pub when: Option<ProcessValueSelector>,
35 pub input: ProcessValueSelector,
36 #[serde(default)]
37 pub dedupe_key: ProcessWakeDedupeKey,
38}
39
40#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
41#[serde(rename_all = "snake_case")]
42pub enum ProcessWakeDedupeKey {
43 #[default]
44 EventIdentity,
45 Selector(ProcessValueSelector),
46 Const(String),
47}
48
49#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum ProcessValueSelector {
52 Payload,
53 Pointer(String),
54 Const(serde_json::Value),
55 Template {
56 template: String,
57 #[serde(default)]
58 fields: BTreeMap<String, ProcessValueSelector>,
59 },
60 Present(String),
61}
62
63#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
64pub struct ProcessEventSemantics {
65 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub terminal: Option<ProcessTerminalSemantics>,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub wake: Option<ProcessWake>,
69}
70
71#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
72#[serde(rename_all = "snake_case")]
73pub enum ProcessTerminalState {
74 Completed,
75 Failed,
76 Cancelled,
77}
78
79#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
80pub struct ProcessTerminalSemantics {
81 pub state: ProcessTerminalState,
82 pub await_output: ProcessAwaitOutput,
83}
84
85#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
86#[serde(tag = "type", rename_all = "snake_case")]
87pub enum ProcessAwaitOutput {
88 Success {
89 value: serde_json::Value,
90 #[serde(default, skip_serializing_if = "Option::is_none")]
91 control: Option<crate::ToolControl>,
92 },
93 Failure {
94 class: crate::ToolFailureClass,
95 code: String,
96 message: String,
97 #[serde(default, skip_serializing_if = "Option::is_none")]
98 raw: Option<serde_json::Value>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 control: Option<crate::ToolControl>,
101 },
102 Cancelled {
103 message: String,
104 #[serde(default, skip_serializing_if = "Option::is_none")]
105 raw: Option<serde_json::Value>,
106 #[serde(default, skip_serializing_if = "Option::is_none")]
107 control: Option<crate::ToolControl>,
108 },
109}
110
111impl ProcessAwaitOutput {
112 pub fn terminal_state(&self) -> ProcessTerminalState {
113 match self {
114 Self::Success { .. } => ProcessTerminalState::Completed,
115 Self::Failure { .. } => ProcessTerminalState::Failed,
116 Self::Cancelled { .. } => ProcessTerminalState::Cancelled,
117 }
118 }
119
120 pub fn from_tool_output(output: crate::ToolCallOutput) -> Self {
121 let control = output.control;
122 match output.outcome {
123 crate::ToolCallOutcome::Success(value) => Self::Success {
124 value: value.to_json_value(),
125 control,
126 },
127 crate::ToolCallOutcome::Failure(failure) => Self::Failure {
128 class: failure.class,
129 code: failure.code,
130 message: failure.message,
131 raw: failure.raw.map(|value| value.to_json_value()),
132 control,
133 },
134 crate::ToolCallOutcome::Cancelled(cancellation) => Self::Cancelled {
135 message: cancellation.message,
136 raw: cancellation.raw.map(|value| value.to_json_value()),
137 control,
138 },
139 }
140 }
141
142 pub fn into_tool_output(self) -> crate::ToolCallOutput {
143 match self {
144 Self::Success { value, control } => {
145 let mut output = crate::ToolCallOutput::success(value);
146 output.control = control;
147 output
148 }
149 Self::Failure {
150 class,
151 code,
152 message,
153 raw,
154 control,
155 } => {
156 let mut failure = crate::ToolFailure::tool(class, code, message);
157 failure.raw = raw.map(crate::ToolValue::from);
158 let mut output = crate::ToolCallOutput::failure(failure);
159 output.control = control;
160 output
161 }
162 Self::Cancelled {
163 message,
164 raw,
165 control,
166 } => {
167 let mut cancellation = crate::ToolCancellation::runtime(message);
168 cancellation.raw = raw.map(crate::ToolValue::from);
169 let mut output = crate::ToolCallOutput::cancelled(cancellation);
170 output.control = control;
171 output
172 }
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178pub struct ProcessWake {
179 pub input: String,
180 pub dedupe_key: String,
181}
182
183pub fn process_signal_event_type(signal_name: &str) -> Result<String, crate::PluginError> {
184 validate_process_signal_name(signal_name)?;
185 Ok(format!("signal.{signal_name}"))
186}
187
188pub fn process_signal_name_from_event_type(event_type: &str) -> Option<&str> {
189 event_type.strip_prefix("signal.")
190}
191
192pub fn process_signal_wait_key(process_id: &str, signal_name: &str, ordinal: u64) -> String {
193 format!("process:{process_id}:signal.{signal_name}:{ordinal}")
194}
195
196pub fn validate_process_signal_name(signal_name: &str) -> Result<(), crate::PluginError> {
197 let valid = !signal_name.is_empty()
198 && signal_name
199 .chars()
200 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-');
201 if valid {
202 Ok(())
203 } else {
204 Err(crate::PluginError::Session(format!(
205 "process signal name must be non-empty and contain only ASCII letters, digits, `_`, or `-`, got `{signal_name}`"
206 )))
207 }
208}
209
210pub fn lashlang_process_event_types() -> Vec<ProcessEventType> {
211 vec![
212 ProcessEventType {
213 name: "process.yield".to_string(),
214 payload_schema: crate::LashSchema::any(),
215 semantics: ProcessEventSemanticsSpec::default(),
216 },
217 ProcessEventType {
218 name: "process.wake".to_string(),
219 payload_schema: crate::LashSchema::any(),
220 semantics: ProcessEventSemanticsSpec {
221 wake: Some(ProcessWakeSpec {
222 when: None,
223 input: ProcessValueSelector::Pointer("/text".to_string()),
224 dedupe_key: ProcessWakeDedupeKey::EventIdentity,
225 }),
226 ..ProcessEventSemanticsSpec::default()
227 },
228 },
229 ]
230}
231
232pub fn lashlang_process_signal_event_types(
233 process: &lashlang::ProcessDecl,
234) -> Vec<ProcessEventType> {
235 process
236 .signals
237 .iter()
238 .map(|signal| ProcessEventType {
239 name: process_signal_event_type(signal.name.as_str())
240 .expect("lashlang process signal declarations use parser-validated names"),
241 payload_schema: crate::LashSchema::new(lashlang_type_expr_schema(&signal.ty)),
242 semantics: ProcessEventSemanticsSpec::default(),
243 })
244 .collect()
245}
246
247fn lashlang_type_expr_schema(ty: &lashlang::TypeExpr) -> serde_json::Value {
248 match ty {
249 lashlang::TypeExpr::Any
250 | lashlang::TypeExpr::Dict
251 | lashlang::TypeExpr::Ref(_)
252 | lashlang::TypeExpr::Process { .. }
253 | lashlang::TypeExpr::TriggerHandle(_) => serde_json::json!({}),
254 lashlang::TypeExpr::Str => serde_json::json!({ "type": "string" }),
255 lashlang::TypeExpr::Int => serde_json::json!({ "type": "integer" }),
256 lashlang::TypeExpr::Float => serde_json::json!({ "type": "number" }),
257 lashlang::TypeExpr::Bool => serde_json::json!({ "type": "boolean" }),
258 lashlang::TypeExpr::Null => serde_json::json!({ "type": "null" }),
259 lashlang::TypeExpr::Enum(values) => serde_json::json!({
260 "enum": values.iter().map(|value| value.as_str()).collect::<Vec<_>>()
261 }),
262 lashlang::TypeExpr::List(item) => serde_json::json!({
263 "type": "array",
264 "items": lashlang_type_expr_schema(item),
265 }),
266 lashlang::TypeExpr::Object(fields) => {
267 let mut properties = serde_json::Map::new();
268 let mut required = Vec::new();
269 for field in fields {
270 properties.insert(field.name.to_string(), lashlang_type_expr_schema(&field.ty));
271 if !field.optional {
272 required.push(serde_json::Value::String(field.name.to_string()));
273 }
274 }
275 let mut schema = serde_json::Map::new();
276 schema.insert(
277 "type".to_string(),
278 serde_json::Value::String("object".to_string()),
279 );
280 schema.insert(
281 "properties".to_string(),
282 serde_json::Value::Object(properties),
283 );
284 if !required.is_empty() {
285 schema.insert("required".to_string(), serde_json::Value::Array(required));
286 }
287 schema.insert(
288 "additionalProperties".to_string(),
289 serde_json::Value::Bool(true),
290 );
291 serde_json::Value::Object(schema)
292 }
293 lashlang::TypeExpr::Union(variants) => serde_json::json!({
294 "anyOf": variants.iter().map(lashlang_type_expr_schema).collect::<Vec<_>>()
295 }),
296 }
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize)]
300pub struct ProcessEvent {
301 pub process_id: ProcessId,
302 pub sequence: u64,
303 pub event_type: String,
304 pub payload: serde_json::Value,
305 pub invocation: crate::RuntimeInvocation,
306 pub semantics: ProcessEventSemantics,
307 pub occurred_at: SystemTime,
308}
309
310#[derive(Clone, Debug, Serialize, Deserialize)]
311pub struct ProcessEventAppendResult {
312 pub event: ProcessEvent,
313 #[serde(default, skip_serializing_if = "Option::is_none")]
314 pub wake_delivery: Option<ProcessWakeDelivery>,
315}
316
317#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
318pub struct ProcessEventAppendRequest {
319 pub event_type: String,
320 pub payload: serde_json::Value,
321 #[serde(default, skip_serializing_if = "Option::is_none")]
322 pub replay: Option<crate::RuntimeReplay>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
324 pub wake_target_scope: Option<SessionScope>,
325}
326
327impl ProcessEventAppendRequest {
328 pub fn new(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
329 Self {
330 event_type: event_type.into(),
331 payload,
332 replay: None,
333 wake_target_scope: None,
334 }
335 }
336
337 pub fn with_replay_key(mut self, replay_key: impl Into<String>) -> Self {
338 self.replay = Some(crate::RuntimeReplay {
339 key: replay_key.into(),
340 });
341 self
342 }
343
344 pub fn with_optional_replay(mut self, replay: Option<crate::RuntimeReplay>) -> Self {
345 self.replay = replay;
346 self
347 }
348
349 pub fn with_wake_target_scope(mut self, scope: SessionScope) -> Self {
350 self.wake_target_scope = Some(scope);
351 self
352 }
353
354 pub fn with_optional_wake_target_scope(mut self, scope: Option<SessionScope>) -> Self {
355 self.wake_target_scope = scope;
356 self
357 }
358
359 pub fn cancel_requested(process_id: &str, reason: Option<String>) -> Self {
360 let payload = serde_json::json!({
361 "reason": reason,
362 });
363 let replay_key = process_event_payload_hash("process.cancel_requested", &payload)
364 .unwrap_or_else(|_| format!("process:{process_id}:cancel_requested"));
365 Self::new("process.cancel_requested", payload).with_replay_key(format!(
366 "process:{process_id}:cancel_requested:{replay_key}"
367 ))
368 }
369}
370
371#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
372pub struct ProcessWakeDelivery {
373 pub wake_id: String,
374 pub target_session_id: String,
375 pub target_scope_id: SessionScopeId,
376 pub process_id: ProcessId,
377 pub sequence: u64,
378 #[serde(default = "default_process_wake_event_type")]
379 pub event_type: String,
380 #[serde(default = "default_process_wake_event_invocation")]
381 pub event_invocation: crate::RuntimeInvocation,
382 #[serde(default, skip_serializing_if = "Option::is_none")]
383 pub process_caused_by: Option<crate::CausalRef>,
384 pub dedupe_key: String,
385 pub input: String,
386 pub created_at_ms: u64,
387}
388
389fn default_process_wake_event_type() -> String {
390 "process.wake".to_string()
391}
392
393fn default_process_wake_event_invocation() -> crate::RuntimeInvocation {
394 crate::RuntimeInvocation {
395 scope: crate::RuntimeScope::new(""),
396 subject: crate::RuntimeSubject::ProcessEvent {
397 process_id: String::new(),
398 sequence: 0,
399 event_type: default_process_wake_event_type(),
400 },
401 caused_by: None,
402 replay: None,
403 }
404}
405
406pub(super) fn default_process_event_types() -> Vec<ProcessEventType> {
407 vec![
408 ProcessEventType {
409 name: "process.cancel_requested".to_string(),
410 payload_schema: crate::LashSchema::any(),
411 semantics: ProcessEventSemanticsSpec::default(),
412 },
413 ProcessEventType {
414 name: "process.waiting".to_string(),
415 payload_schema: crate::LashSchema::any(),
416 semantics: ProcessEventSemanticsSpec::default(),
417 },
418 ProcessEventType {
419 name: "process.resumed".to_string(),
420 payload_schema: crate::LashSchema::any(),
421 semantics: ProcessEventSemanticsSpec::default(),
422 },
423 terminal_event_type("process.completed", ProcessTerminalState::Completed),
424 terminal_event_type("process.failed", ProcessTerminalState::Failed),
425 terminal_event_type("process.cancelled", ProcessTerminalState::Cancelled),
426 ]
427}
428
429fn terminal_event_type(name: &str, state: ProcessTerminalState) -> ProcessEventType {
430 ProcessEventType {
431 name: name.to_string(),
432 payload_schema: crate::LashSchema::any(),
433 semantics: ProcessEventSemanticsSpec {
434 terminal: Some(ProcessTerminalSpec {
435 state,
436 await_output: Some(ProcessValueSelector::Pointer("/await_output".to_string())),
437 }),
438 ..ProcessEventSemanticsSpec::default()
439 },
440 }
441}