1use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8use std::result::Result as StdResult;
9
10use crate::spec::{WorkflowStep, CallDefinition, EmitDefinition, ListenDefinition, WaitDefinition, RunDefinition, SwitchCase, ForDefinition, ForkDefinition, TryDefinition, RaiseDefinition, ServerlessWorkflow};
11use crate::ir::{ActivityIR, WorkflowIR, ActivityImplementation};
12use kotoba_errors::WorkflowError;
13
14pub struct ServerlessWorkflowParser;
16
17impl ServerlessWorkflowParser {
18 pub fn new() -> Self {
20 Self
21 }
22
23 pub fn parse_json(&self, json: JsonValue) -> StdResult<ServerlessWorkflow, WorkflowError> {
25 serde_json::from_value(json)
26 .map_err(|e| WorkflowError::InvalidDefinition(format!("JSON parsing failed: {}", e)))
27 }
28
29 pub fn parse_yaml(&self, yaml_content: &str) -> StdResult<ServerlessWorkflow, WorkflowError> {
31 serde_yaml::from_str(yaml_content)
32 .map_err(|e| WorkflowError::InvalidDefinition(format!("YAML parsing failed: {}", e)))
33 }
34
35 pub fn convert_to_workflow_ir(&self, sw: ServerlessWorkflow) -> StdResult<WorkflowIR, WorkflowError> {
37 let mut workflow_ir = WorkflowIR {
38 id: format!("{}.{}.{}", sw.document.namespace, sw.document.name, sw.document.version),
39 name: sw.document.name,
40 version: sw.document.version,
41 description: sw.document.description,
42 inputs: Vec::new(),
43 outputs: Vec::new(),
44 strategy: crate::ir::WorkflowStrategyOp::Seq { strategies: Vec::new() },
45 activities: Vec::new(),
46 timeout: None,
47 retry_policy: None,
48 metadata: HashMap::new(),
49 };
50
51 for (index, step) in sw.r#do.iter().enumerate() {
53 let activity = self.convert_step_to_activity(step, index)?;
54 workflow_ir.activities.push(activity);
55 }
56
57 if let Some(title) = sw.document.title {
59 workflow_ir.metadata.insert("title".to_string(), kotoba_core::types::Value::String(title));
60 }
61 if let Some(summary) = sw.document.summary {
62 workflow_ir.metadata.insert("summary".to_string(), kotoba_core::types::Value::String(summary));
63 }
64 workflow_ir.metadata.insert("dsl".to_string(), kotoba_core::types::Value::String(sw.document.dsl));
65 workflow_ir.metadata.insert("namespace".to_string(), kotoba_core::types::Value::String(sw.document.namespace));
66
67 Ok(workflow_ir)
68 }
69
70 fn convert_step_to_activity(&self, step: &crate::spec::WorkflowStep, index: usize) -> StdResult<ActivityIR, WorkflowError> {
72 match step {
73 crate::spec::WorkflowStep::Call { call, .. } => {
74 self.convert_call_step(call, index)
75 }
76 crate::spec::WorkflowStep::Emit { emit, .. } => {
77 self.convert_emit_step(emit, index)
78 }
79 crate::spec::WorkflowStep::Listen { listen, .. } => {
80 self.convert_listen_step(listen, index)
81 }
82 crate::spec::WorkflowStep::Wait { wait, .. } => {
83 self.convert_wait_step(wait, index)
84 }
85 crate::spec::WorkflowStep::Run { run, .. } => {
86 self.convert_run_step(run, index)
87 }
88 crate::spec::WorkflowStep::Switch { switch, .. } => {
89 self.convert_switch_step(switch, index)
90 }
91 crate::spec::WorkflowStep::For { r#for, .. } => {
92 self.convert_for_step(r#for, index)
93 }
94 crate::spec::WorkflowStep::Fork { fork, .. } => {
95 self.convert_fork_step(fork, index)
96 }
97 crate::spec::WorkflowStep::Try { r#try, .. } => {
98 self.convert_try_step(r#try, index)
99 }
100 crate::spec::WorkflowStep::Raise { raise, .. } => {
101 self.convert_raise_step(raise, index)
102 }
103 crate::spec::WorkflowStep::Set { set, .. } => {
104 self.convert_set_step(set, index)
105 }
106 }
107 }
108
109 fn convert_call_step(&self, call: &crate::spec::CallDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
110 let activity_type = match call {
111 crate::spec::CallDefinition::Http { .. } => "http_call",
112 crate::spec::CallDefinition::Grpc { .. } => "grpc_call",
113 crate::spec::CallDefinition::OpenApi { .. } => "openapi_call",
114 crate::spec::CallDefinition::AsyncApi { .. } => "asyncapi_call",
115 };
116
117 let config = serde_json::to_value(call)
118 .map_err(|e| WorkflowError::InvalidDefinition(format!("Call config serialization failed: {}", e)))?;
119
120 Ok(ActivityIR {
121 name: format!("Call Step {}", index),
122 description: Some(format!("Serverless Workflow {} step", activity_type)),
123 inputs: Vec::new(), outputs: Vec::new(), timeout: None,
126 retry_policy: None,
127 implementation: ActivityImplementation::Http {
128 url: "http://localhost".to_string(), method: "GET".to_string(), headers: HashMap::new(),
131 },
132 })
133 }
134
135 fn convert_emit_step(&self, _emit: &crate::spec::EmitDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
136 Ok(ActivityIR {
137 name: format!("Emit Step {}", index),
138 description: Some("Serverless Workflow emit step".to_string()),
139 inputs: Vec::new(),
140 outputs: Vec::new(),
141 timeout: None,
142 retry_policy: None,
143 implementation: ActivityImplementation::Function {
144 function_name: "emit_event".to_string(),
145 },
146 })
147 }
148
149 fn convert_listen_step(&self, _listen: &crate::spec::ListenDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
150 Ok(ActivityIR {
151 name: format!("Listen Step {}", index),
152 description: Some("Serverless Workflow listen step".to_string()),
153 inputs: Vec::new(),
154 outputs: Vec::new(),
155 timeout: None,
156 retry_policy: None,
157 implementation: ActivityImplementation::Function {
158 function_name: "listen_event".to_string(),
159 },
160 })
161 }
162
163 fn convert_wait_step(&self, _wait: &crate::spec::WaitDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
164 Ok(ActivityIR {
165 name: format!("Wait Step {}", index),
166 description: Some("Serverless Workflow wait step".to_string()),
167 inputs: Vec::new(),
168 outputs: Vec::new(),
169 timeout: None,
170 retry_policy: None,
171 implementation: ActivityImplementation::Function {
172 function_name: "wait".to_string(),
173 },
174 })
175 }
176
177 fn convert_run_step(&self, run: &RunDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
178 let function_name = match run {
179 crate::spec::RunDefinition::Container { .. } => "run_container",
180 crate::spec::RunDefinition::Script { .. } => "run_script",
181 crate::spec::RunDefinition::Workflow { .. } => "run_workflow",
182 };
183
184 Ok(ActivityIR {
185 name: format!("Run Step {}", index),
186 description: Some("Serverless Workflow run step".to_string()),
187 inputs: Vec::new(),
188 outputs: Vec::new(),
189 timeout: None,
190 retry_policy: None,
191 implementation: ActivityImplementation::Function {
192 function_name: function_name.to_string(),
193 },
194 })
195 }
196
197 fn convert_switch_step(&self, _switch: &[crate::spec::SwitchCase], index: usize) -> StdResult<ActivityIR, WorkflowError> {
198 Ok(ActivityIR {
199 name: format!("Switch Step {}", index),
200 description: Some("Serverless Workflow switch step".to_string()),
201 inputs: Vec::new(),
202 outputs: Vec::new(),
203 timeout: None,
204 retry_policy: None,
205 implementation: ActivityImplementation::Function {
206 function_name: "switch".to_string(),
207 },
208 })
209 }
210
211 fn convert_for_step(&self, _for_def: &crate::spec::ForDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
212 Ok(ActivityIR {
213 name: format!("For Step {}", index),
214 description: Some("Serverless Workflow for loop step".to_string()),
215 inputs: Vec::new(),
216 outputs: Vec::new(),
217 timeout: None,
218 retry_policy: None,
219 implementation: ActivityImplementation::Function {
220 function_name: "for_loop".to_string(),
221 },
222 })
223 }
224
225 fn convert_fork_step(&self, _fork: &crate::spec::ForkDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
226 Ok(ActivityIR {
227 name: format!("Fork Step {}", index),
228 description: Some("Serverless Workflow fork step".to_string()),
229 inputs: Vec::new(),
230 outputs: Vec::new(),
231 timeout: None,
232 retry_policy: None,
233 implementation: ActivityImplementation::Function {
234 function_name: "fork".to_string(),
235 },
236 })
237 }
238
239 fn convert_try_step(&self, _try_def: &crate::spec::TryDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
240 Ok(ActivityIR {
241 name: format!("Try Step {}", index),
242 description: Some("Serverless Workflow try-catch step".to_string()),
243 inputs: Vec::new(),
244 outputs: Vec::new(),
245 timeout: None,
246 retry_policy: None,
247 implementation: ActivityImplementation::Function {
248 function_name: "try_catch".to_string(),
249 },
250 })
251 }
252
253 fn convert_raise_step(&self, _raise: &crate::spec::RaiseDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
254 Ok(ActivityIR {
255 name: format!("Raise Step {}", index),
256 description: Some("Serverless Workflow raise error step".to_string()),
257 inputs: Vec::new(),
258 outputs: Vec::new(),
259 timeout: None,
260 retry_policy: None,
261 implementation: ActivityImplementation::Function {
262 function_name: "raise_error".to_string(),
263 },
264 })
265 }
266
267 fn convert_set_step(&self, _set: &HashMap<String, serde_json::Value>, index: usize) -> StdResult<ActivityIR, WorkflowError> {
268 Ok(ActivityIR {
269 name: format!("Set Step {}", index),
270 description: Some("Serverless Workflow set variable step".to_string()),
271 inputs: Vec::new(),
272 outputs: Vec::new(),
273 timeout: None,
274 retry_policy: None,
275 implementation: ActivityImplementation::Function {
276 function_name: "set_variable".to_string(),
277 },
278 })
279 }
280}