lash_core/runtime/process/
materialization.rs1use crate::plugin::PluginError;
2
3use super::events::{
4 ProcessAwaitOutput, ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessTerminalSemantics,
5 ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
6 ProcessWakeDedupeKey, ProcessWakeSpec,
7};
8
9pub fn materialize_process_event_semantics(
10 process_id: &str,
11 sequence: u64,
12 payload: &serde_json::Value,
13 spec: &ProcessEventSemanticsSpec,
14) -> Result<ProcessEventSemantics, PluginError> {
15 materialize_event_semantics(process_id, sequence, payload, spec)
16}
17
18pub(super) fn materialize_event_semantics(
19 process_id: &str,
20 sequence: u64,
21 payload: &serde_json::Value,
22 spec: &ProcessEventSemanticsSpec,
23) -> Result<ProcessEventSemantics, PluginError> {
24 let terminal = spec
25 .terminal
26 .as_ref()
27 .map(|terminal| materialize_terminal_semantics(payload, terminal))
28 .transpose()?;
29 let wake = spec
30 .wake
31 .as_ref()
32 .map(|wake| materialize_wake(process_id, sequence, payload, wake))
33 .transpose()?
34 .flatten();
35 Ok(ProcessEventSemantics { terminal, wake })
36}
37
38fn materialize_terminal_semantics(
39 payload: &serde_json::Value,
40 terminal: &ProcessTerminalSpec,
41) -> Result<ProcessTerminalSemantics, PluginError> {
42 let await_output = match &terminal.await_output {
43 Some(selector) => {
44 let selected = select_value(payload, selector)?;
45 serde_json::from_value::<ProcessAwaitOutput>(selected.clone())
46 .unwrap_or_else(|_| selected_value_to_await_output(terminal.state, selected))
47 }
48 None if terminal.state == ProcessTerminalState::Completed => ProcessAwaitOutput::Success {
49 value: payload.clone(),
50 control: None,
51 },
52 None => {
53 return Err(PluginError::Session(
54 "failed or cancelled terminal events must declare await output".to_string(),
55 ));
56 }
57 };
58 Ok(ProcessTerminalSemantics {
59 state: terminal.state,
60 await_output,
61 })
62}
63
64fn selected_value_to_await_output(
65 state: ProcessTerminalState,
66 value: serde_json::Value,
67) -> ProcessAwaitOutput {
68 match state {
69 ProcessTerminalState::Completed => ProcessAwaitOutput::Success {
70 value,
71 control: None,
72 },
73 ProcessTerminalState::Failed => ProcessAwaitOutput::Failure {
74 class: crate::ToolFailureClass::Execution,
75 code: "process_failed".to_string(),
76 message: selector_value_to_string(&value),
77 raw: Some(value),
78 control: None,
79 },
80 ProcessTerminalState::Cancelled => ProcessAwaitOutput::Cancelled {
81 message: selector_value_to_string(&value),
82 raw: Some(value),
83 control: None,
84 },
85 }
86}
87
88fn materialize_wake(
89 process_id: &str,
90 sequence: u64,
91 payload: &serde_json::Value,
92 wake: &ProcessWakeSpec,
93) -> Result<Option<ProcessWake>, PluginError> {
94 if let Some(when) = &wake.when {
95 let selected = select_value(payload, when)?;
96 if !selector_value_is_truthy(&selected) {
97 return Ok(None);
98 }
99 }
100 let input = selector_value_to_string(&select_value(payload, &wake.input)?);
101 let dedupe_key = match &wake.dedupe_key {
102 ProcessWakeDedupeKey::EventIdentity => format!("{process_id}:{sequence}"),
103 ProcessWakeDedupeKey::Selector(selector) => {
104 selector_value_to_string(&select_value(payload, selector)?)
105 }
106 ProcessWakeDedupeKey::Const(value) => value.clone(),
107 };
108 Ok(Some(ProcessWake { input, dedupe_key }))
109}
110
111pub(super) fn select_value(
112 payload: &serde_json::Value,
113 selector: &ProcessValueSelector,
114) -> Result<serde_json::Value, PluginError> {
115 match selector {
116 ProcessValueSelector::Payload => Ok(payload.clone()),
117 ProcessValueSelector::Pointer(pointer) => {
118 payload.pointer(pointer).cloned().ok_or_else(|| {
119 PluginError::Session(format!("payload pointer `{pointer}` did not match"))
120 })
121 }
122 ProcessValueSelector::Const(value) => Ok(value.clone()),
123 ProcessValueSelector::Template { template, fields } => {
124 let mut rendered = template.clone();
125 for (name, selector) in fields {
126 let value = select_value(payload, selector)?;
127 rendered =
128 rendered.replace(&format!("{{{name}}}"), &selector_value_to_string(&value));
129 }
130 Ok(serde_json::Value::String(rendered))
131 }
132 ProcessValueSelector::Present(pointer) => {
133 Ok(serde_json::Value::Bool(payload.pointer(pointer).is_some()))
134 }
135 }
136}
137
138fn selector_value_to_string(value: &serde_json::Value) -> String {
139 value
140 .as_str()
141 .map(ToOwned::to_owned)
142 .unwrap_or_else(|| value.to_string())
143}
144
145fn selector_value_is_truthy(value: &serde_json::Value) -> bool {
146 match value {
147 serde_json::Value::Null => false,
148 serde_json::Value::Bool(value) => *value,
149 serde_json::Value::String(value) => !value.is_empty(),
150 serde_json::Value::Array(value) => !value.is_empty(),
151 serde_json::Value::Object(value) => !value.is_empty(),
152 serde_json::Value::Number(_) => true,
153 }
154}