wakflo_core/
context.rs

1use crate::connector::{Connector, ConnectorStepState, ConnectorVersion, JobInstanceState};
2use crate::step::ConnectorStep;
3use crate::workflow::Workflow;
4use crate::workflow_instance::WorkflowInstance;
5use anyhow::*;
6use rion::Rion;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use std::collections::HashMap;
10
11#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Default)]
12pub struct JobTaskContext {
13    pub current_state: ConnectorStepState,
14    pub state: JobInstanceState,
15}
16
17/// resolve rion variables
18pub fn resolve_variables(template: &str, ctx: TaskContext) -> Result<String> {
19    let result = Rion::resolve_var_str(template, &ctx)?;
20    Ok(result)
21}
22
23/// extract step input
24pub fn extract_step_input<T>(properties: &Value) -> Result<T>
25where
26    T: serde::de::DeserializeOwned,
27{
28    let value = serde_json::from_value::<HashMap<String, Value>>(properties.clone())?;
29    let d = json!({});
30    let raw_input = value.get("input").unwrap_or(&d).clone();
31    let input = serde_json::from_value::<T>(raw_input)?;
32    Ok(input)
33}
34
35/// extract step output
36pub fn extract_step_output<T>(properties: &Value) -> Result<T>
37where
38    T: serde::de::DeserializeOwned,
39{
40    let value = serde_json::from_value::<HashMap<String, Value>>(properties.clone())?;
41    let d = json!({});
42    let raw_input = value.get("output").unwrap_or(&d).clone();
43    let output = serde_json::from_value::<T>(raw_input)?;
44    Ok(output)
45}
46
47/// extract step output
48pub fn convert_step_property<T>(value: &Value) -> Result<T>
49where
50    T: serde::de::DeserializeOwned,
51{
52    let output = serde_json::from_value::<T>(value.clone())?;
53    Ok(output)
54}
55
56/// extract step input / output
57pub fn extract_step_input_output<I, O>(properties: &Value) -> Result<(I, O)>
58where
59    I: serde::de::DeserializeOwned,
60    O: serde::de::DeserializeOwned,
61{
62    let output = extract_step_output::<O>(properties)?;
63    let input = extract_step_input::<I>(properties)?;
64
65    Ok((input, output))
66}
67
68#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug)]
69pub struct TaskContext {
70    /// Connector that owns this schema instance
71    pub previous_task_id: Option<String>,
72
73    /// Connector that owns this schema instance
74    pub current_task_id: Option<String>,
75
76    /// Connector that owns this schema instance
77    pub next_task_id: Option<String>,
78
79    pub steps: HashMap<String, ConnectorStepState>,
80
81    pub env: HashMap<String, String>,
82
83    pub input: HashMap<String, Value>,
84
85    pub job: WorkflowInstance,
86
87    pub step: ConnectorStep,
88
89    pub workflow: Workflow,
90
91    pub connector: Connector,
92
93    pub version: ConnectorVersion,
94}
95
96impl TaskContext {
97    /// converts job to json
98    pub fn to_json(&self) -> Value {
99        serde_json::to_value(self.clone()).unwrap_or_default()
100    }
101
102    /// gets rion with context applied
103    pub fn resolve_variables(&self, template: String) -> Result<String> {
104        resolve_variables(template.as_str(), self.clone())
105    }
106
107    /// gets rion with context applied
108    pub fn resolve_input_variables<T>(&self, template: HashMap<String, Value>) -> Result<T>
109    where
110        T: Serialize + serde::de::DeserializeOwned,
111    {
112        let j_str = serde_json::to_string::<HashMap<String, Value>>(&template)?;
113        let val = Rion::resolve_var_str(j_str.as_str(), &self.clone())?;
114        let output = serde_json::from_str::<T>(val.as_str())?;
115        Ok(output)
116    }
117
118    /// gets rion with context applied
119    pub fn value_to_output<T>(value: T) -> Result<HashMap<String, Value>>
120    where
121        T: Serialize + serde::de::DeserializeOwned,
122    {
123        let raw_value = serde_json::to_value(value)?;
124        let output = serde_json::from_value::<HashMap<String, Value>>(raw_value)?;
125        Ok(output)
126    }
127}