Skip to main content

lash_core/runtime/process/
materialization.rs

1use crate::plugin::PluginError;
2
3use super::events::{
4    ProcessAwaitOutput, ProcessEventSemantics, ProcessEventSemanticsSpec, ProcessTerminalSemantics,
5    ProcessTerminalSpec, ProcessTerminalState, ProcessValueSelector, ProcessWake,
6    ProcessWakeDedupeKey, ProcessWakeSpec,
7};
8
9pub fn materialize_process_event_semantics(
10    process_id: &str,
11    sequence: u64,
12    payload: &serde_json::Value,
13    spec: &ProcessEventSemanticsSpec,
14) -> Result<ProcessEventSemantics, PluginError> {
15    materialize_event_semantics(process_id, sequence, payload, spec)
16}
17
18pub(super) fn materialize_event_semantics(
19    process_id: &str,
20    sequence: u64,
21    payload: &serde_json::Value,
22    spec: &ProcessEventSemanticsSpec,
23) -> Result<ProcessEventSemantics, PluginError> {
24    let terminal = spec
25        .terminal
26        .as_ref()
27        .map(|terminal| materialize_terminal_semantics(payload, terminal))
28        .transpose()?;
29    let wake = spec
30        .wake
31        .as_ref()
32        .map(|wake| materialize_wake(process_id, sequence, payload, wake))
33        .transpose()?
34        .flatten();
35    Ok(ProcessEventSemantics { terminal, wake })
36}
37
38fn materialize_terminal_semantics(
39    payload: &serde_json::Value,
40    terminal: &ProcessTerminalSpec,
41) -> Result<ProcessTerminalSemantics, PluginError> {
42    let await_output = match &terminal.await_output {
43        Some(selector) => {
44            let selected = select_value(payload, selector)?;
45            serde_json::from_value::<ProcessAwaitOutput>(selected.clone())
46                .unwrap_or_else(|_| selected_value_to_await_output(terminal.state, selected))
47        }
48        None if terminal.state == ProcessTerminalState::Completed => ProcessAwaitOutput::Success {
49            value: payload.clone(),
50            control: None,
51        },
52        None => {
53            return Err(PluginError::Session(
54                "failed or cancelled terminal events must declare await output".to_string(),
55            ));
56        }
57    };
58    Ok(ProcessTerminalSemantics {
59        state: terminal.state,
60        await_output,
61    })
62}
63
64fn selected_value_to_await_output(
65    state: ProcessTerminalState,
66    value: serde_json::Value,
67) -> ProcessAwaitOutput {
68    match state {
69        ProcessTerminalState::Completed => ProcessAwaitOutput::Success {
70            value,
71            control: None,
72        },
73        ProcessTerminalState::Failed => ProcessAwaitOutput::Failure {
74            class: crate::ToolFailureClass::Execution,
75            code: "process_failed".to_string(),
76            message: selector_value_to_string(&value),
77            raw: Some(value),
78            control: None,
79        },
80        ProcessTerminalState::Cancelled => ProcessAwaitOutput::Cancelled {
81            message: selector_value_to_string(&value),
82            raw: Some(value),
83            control: None,
84        },
85    }
86}
87
88fn materialize_wake(
89    process_id: &str,
90    sequence: u64,
91    payload: &serde_json::Value,
92    wake: &ProcessWakeSpec,
93) -> Result<Option<ProcessWake>, PluginError> {
94    if let Some(when) = &wake.when {
95        let selected = select_value(payload, when)?;
96        if !selector_value_is_truthy(&selected) {
97            return Ok(None);
98        }
99    }
100    let input = selector_value_to_string(&select_value(payload, &wake.input)?);
101    let dedupe_key = match &wake.dedupe_key {
102        ProcessWakeDedupeKey::EventIdentity => format!("{process_id}:{sequence}"),
103        ProcessWakeDedupeKey::Selector(selector) => {
104            selector_value_to_string(&select_value(payload, selector)?)
105        }
106        ProcessWakeDedupeKey::Const(value) => value.clone(),
107    };
108    Ok(Some(ProcessWake { input, dedupe_key }))
109}
110
111pub(super) fn select_value(
112    payload: &serde_json::Value,
113    selector: &ProcessValueSelector,
114) -> Result<serde_json::Value, PluginError> {
115    match selector {
116        ProcessValueSelector::Payload => Ok(payload.clone()),
117        ProcessValueSelector::Pointer(pointer) => {
118            payload.pointer(pointer).cloned().ok_or_else(|| {
119                PluginError::Session(format!("payload pointer `{pointer}` did not match"))
120            })
121        }
122        ProcessValueSelector::Const(value) => Ok(value.clone()),
123        ProcessValueSelector::Template { template, fields } => {
124            let mut rendered = template.clone();
125            for (name, selector) in fields {
126                let value = select_value(payload, selector)?;
127                rendered =
128                    rendered.replace(&format!("{{{name}}}"), &selector_value_to_string(&value));
129            }
130            Ok(serde_json::Value::String(rendered))
131        }
132        ProcessValueSelector::Present(pointer) => {
133            Ok(serde_json::Value::Bool(payload.pointer(pointer).is_some()))
134        }
135    }
136}
137
138fn selector_value_to_string(value: &serde_json::Value) -> String {
139    value
140        .as_str()
141        .map(ToOwned::to_owned)
142        .unwrap_or_else(|| value.to_string())
143}
144
145fn selector_value_is_truthy(value: &serde_json::Value) -> bool {
146    match value {
147        serde_json::Value::Null => false,
148        serde_json::Value::Bool(value) => *value,
149        serde_json::Value::String(value) => !value.is_empty(),
150        serde_json::Value::Array(value) => !value.is_empty(),
151        serde_json::Value::Object(value) => !value.is_empty(),
152        serde_json::Value::Number(_) => true,
153    }
154}