1use crate::connector::{Connector, ConnectorStepState, ConnectorVersion, JobInstanceState};
2use crate::step::ConnectorStep;
3use crate::workflow::Workflow;
4use crate::workflow_instance::WorkflowInstance;
5use anyhow::*;
6use rion::Rion;
7use serde::{Deserialize, Serialize};
8use serde_json::{json, Value};
9use std::collections::HashMap;
10
11#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug, Default)]
12pub struct JobTaskContext {
13 pub current_state: ConnectorStepState,
14 pub state: JobInstanceState,
15}
16
17pub fn resolve_variables(template: &str, ctx: TaskContext) -> Result<String> {
19 let result = Rion::resolve_var_str(template, &ctx)?;
20 Ok(result)
21}
22
23pub fn extract_step_input<T>(properties: &Value) -> Result<T>
25where
26 T: serde::de::DeserializeOwned,
27{
28 let value = serde_json::from_value::<HashMap<String, Value>>(properties.clone())?;
29 let d = json!({});
30 let raw_input = value.get("input").unwrap_or(&d).clone();
31 let input = serde_json::from_value::<T>(raw_input)?;
32 Ok(input)
33}
34
35pub fn extract_step_output<T>(properties: &Value) -> Result<T>
37where
38 T: serde::de::DeserializeOwned,
39{
40 let value = serde_json::from_value::<HashMap<String, Value>>(properties.clone())?;
41 let d = json!({});
42 let raw_input = value.get("output").unwrap_or(&d).clone();
43 let output = serde_json::from_value::<T>(raw_input)?;
44 Ok(output)
45}
46
47pub fn convert_step_property<T>(value: &Value) -> Result<T>
49where
50 T: serde::de::DeserializeOwned,
51{
52 let output = serde_json::from_value::<T>(value.clone())?;
53 Ok(output)
54}
55
56pub fn extract_step_input_output<I, O>(properties: &Value) -> Result<(I, O)>
58where
59 I: serde::de::DeserializeOwned,
60 O: serde::de::DeserializeOwned,
61{
62 let output = extract_step_output::<O>(properties)?;
63 let input = extract_step_input::<I>(properties)?;
64
65 Ok((input, output))
66}
67
68#[derive(PartialEq, Eq, Serialize, Deserialize, Clone, Debug)]
69pub struct TaskContext {
70 pub previous_task_id: Option<String>,
72
73 pub current_task_id: Option<String>,
75
76 pub next_task_id: Option<String>,
78
79 pub steps: HashMap<String, ConnectorStepState>,
80
81 pub env: HashMap<String, String>,
82
83 pub input: HashMap<String, Value>,
84
85 pub job: WorkflowInstance,
86
87 pub step: ConnectorStep,
88
89 pub workflow: Workflow,
90
91 pub connector: Connector,
92
93 pub version: ConnectorVersion,
94}
95
96impl TaskContext {
97 pub fn to_json(&self) -> Value {
99 serde_json::to_value(self.clone()).unwrap_or_default()
100 }
101
102 pub fn resolve_variables(&self, template: String) -> Result<String> {
104 resolve_variables(template.as_str(), self.clone())
105 }
106
107 pub fn resolve_input_variables<T>(&self, template: HashMap<String, Value>) -> Result<T>
109 where
110 T: Serialize + serde::de::DeserializeOwned,
111 {
112 let j_str = serde_json::to_string::<HashMap<String, Value>>(&template)?;
113 let val = Rion::resolve_var_str(j_str.as_str(), &self.clone())?;
114 let output = serde_json::from_str::<T>(val.as_str())?;
115 Ok(output)
116 }
117
118 pub fn value_to_output<T>(value: T) -> Result<HashMap<String, Value>>
120 where
121 T: Serialize + serde::de::DeserializeOwned,
122 {
123 let raw_value = serde_json::to_value(value)?;
124 let output = serde_json::from_value::<HashMap<String, Value>>(raw_value)?;
125 Ok(output)
126 }
127}