kotoba_workflow/
parser.rs

1//! Serverless Workflow Parser
2//!
3//! Parses Serverless Workflow DSL (https://serverlessworkflow.io/) from JSON/YAML
4//! and converts to WorkflowIR for execution.
5
6use serde_json::Value as JsonValue;
7use std::collections::HashMap;
8use std::result::Result as StdResult;
9
10use crate::spec::{WorkflowStep, CallDefinition, EmitDefinition, ListenDefinition, WaitDefinition, RunDefinition, SwitchCase, ForDefinition, ForkDefinition, TryDefinition, RaiseDefinition, ServerlessWorkflow};
11use crate::ir::{ActivityIR, WorkflowIR, ActivityImplementation};
12use kotoba_errors::WorkflowError;
13
14/// Serverless Workflow Parser
15pub struct ServerlessWorkflowParser;
16
17impl ServerlessWorkflowParser {
18    /// 新しいパーサーを作成
19    pub fn new() -> Self {
20        Self
21    }
22
23    /// JSONからServerlessWorkflowをパース
24    pub fn parse_json(&self, json: JsonValue) -> StdResult<ServerlessWorkflow, WorkflowError> {
25        serde_json::from_value(json)
26            .map_err(|e| WorkflowError::InvalidDefinition(format!("JSON parsing failed: {}", e)))
27    }
28
29    /// YAML文字列からServerlessWorkflowをパース
30    pub fn parse_yaml(&self, yaml_content: &str) -> StdResult<ServerlessWorkflow, WorkflowError> {
31        serde_yaml::from_str(yaml_content)
32            .map_err(|e| WorkflowError::InvalidDefinition(format!("YAML parsing failed: {}", e)))
33    }
34
35    /// ServerlessWorkflowをWorkflowIRに変換
36    pub fn convert_to_workflow_ir(&self, sw: ServerlessWorkflow) -> StdResult<WorkflowIR, WorkflowError> {
37        let mut workflow_ir = WorkflowIR {
38            id: format!("{}.{}.{}", sw.document.namespace, sw.document.name, sw.document.version),
39            name: sw.document.name,
40            version: sw.document.version,
41            description: sw.document.description,
42            inputs: Vec::new(),
43            outputs: Vec::new(),
44            strategy: crate::ir::WorkflowStrategyOp::Seq { strategies: Vec::new() },
45            activities: Vec::new(),
46            timeout: None,
47            retry_policy: None,
48            metadata: HashMap::new(),
49        };
50
51        // Convert workflow steps to activities
52        for (index, step) in sw.r#do.iter().enumerate() {
53            let activity = self.convert_step_to_activity(step, index)?;
54            workflow_ir.activities.push(activity);
55        }
56
57        // Add metadata
58        if let Some(title) = sw.document.title {
59            workflow_ir.metadata.insert("title".to_string(), kotoba_core::types::Value::String(title));
60        }
61        if let Some(summary) = sw.document.summary {
62            workflow_ir.metadata.insert("summary".to_string(), kotoba_core::types::Value::String(summary));
63        }
64        workflow_ir.metadata.insert("dsl".to_string(), kotoba_core::types::Value::String(sw.document.dsl));
65        workflow_ir.metadata.insert("namespace".to_string(), kotoba_core::types::Value::String(sw.document.namespace));
66
67        Ok(workflow_ir)
68    }
69
70    /// Convert workflow step to activity
71    fn convert_step_to_activity(&self, step: &crate::spec::WorkflowStep, index: usize) -> StdResult<ActivityIR, WorkflowError> {
72        match step {
73            crate::spec::WorkflowStep::Call { call, .. } => {
74                self.convert_call_step(call, index)
75            }
76            crate::spec::WorkflowStep::Emit { emit, .. } => {
77                self.convert_emit_step(emit, index)
78            }
79            crate::spec::WorkflowStep::Listen { listen, .. } => {
80                self.convert_listen_step(listen, index)
81            }
82            crate::spec::WorkflowStep::Wait { wait, .. } => {
83                self.convert_wait_step(wait, index)
84            }
85            crate::spec::WorkflowStep::Run { run, .. } => {
86                self.convert_run_step(run, index)
87            }
88            crate::spec::WorkflowStep::Switch { switch, .. } => {
89                self.convert_switch_step(switch, index)
90            }
91            crate::spec::WorkflowStep::For { r#for, .. } => {
92                self.convert_for_step(r#for, index)
93            }
94            crate::spec::WorkflowStep::Fork { fork, .. } => {
95                self.convert_fork_step(fork, index)
96            }
97            crate::spec::WorkflowStep::Try { r#try, .. } => {
98                self.convert_try_step(r#try, index)
99            }
100            crate::spec::WorkflowStep::Raise { raise, .. } => {
101                self.convert_raise_step(raise, index)
102            }
103            crate::spec::WorkflowStep::Set { set, .. } => {
104                self.convert_set_step(set, index)
105            }
106        }
107    }
108
109    fn convert_call_step(&self, call: &crate::spec::CallDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
110        let activity_type = match call {
111            crate::spec::CallDefinition::Http { .. } => "http_call",
112            crate::spec::CallDefinition::Grpc { .. } => "grpc_call",
113            crate::spec::CallDefinition::OpenApi { .. } => "openapi_call",
114            crate::spec::CallDefinition::AsyncApi { .. } => "asyncapi_call",
115        };
116
117        let config = serde_json::to_value(call)
118            .map_err(|e| WorkflowError::InvalidDefinition(format!("Call config serialization failed: {}", e)))?;
119
120        Ok(ActivityIR {
121            name: format!("Call Step {}", index),
122            description: Some(format!("Serverless Workflow {} step", activity_type)),
123            inputs: Vec::new(), // TODO: 実際の入力パラメータを解析
124            outputs: Vec::new(), // TODO: 実際の出力パラメータを解析
125            timeout: None,
126            retry_policy: None,
127            implementation: ActivityImplementation::Http {
128                url: "http://localhost".to_string(), // TODO: 実際のURLを解析
129                method: "GET".to_string(), // TODO: 実際のメソッドを解析
130                headers: HashMap::new(),
131            },
132        })
133    }
134
135    fn convert_emit_step(&self, _emit: &crate::spec::EmitDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
136        Ok(ActivityIR {
137            name: format!("Emit Step {}", index),
138            description: Some("Serverless Workflow emit step".to_string()),
139            inputs: Vec::new(),
140            outputs: Vec::new(),
141            timeout: None,
142            retry_policy: None,
143            implementation: ActivityImplementation::Function {
144                function_name: "emit_event".to_string(),
145            },
146        })
147    }
148
149    fn convert_listen_step(&self, _listen: &crate::spec::ListenDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
150        Ok(ActivityIR {
151            name: format!("Listen Step {}", index),
152            description: Some("Serverless Workflow listen step".to_string()),
153            inputs: Vec::new(),
154            outputs: Vec::new(),
155            timeout: None,
156            retry_policy: None,
157            implementation: ActivityImplementation::Function {
158                function_name: "listen_event".to_string(),
159            },
160        })
161    }
162
163    fn convert_wait_step(&self, _wait: &crate::spec::WaitDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
164        Ok(ActivityIR {
165            name: format!("Wait Step {}", index),
166            description: Some("Serverless Workflow wait step".to_string()),
167            inputs: Vec::new(),
168            outputs: Vec::new(),
169            timeout: None,
170            retry_policy: None,
171            implementation: ActivityImplementation::Function {
172                function_name: "wait".to_string(),
173            },
174        })
175    }
176
177    fn convert_run_step(&self, run: &RunDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
178        let function_name = match run {
179            crate::spec::RunDefinition::Container { .. } => "run_container",
180            crate::spec::RunDefinition::Script { .. } => "run_script",
181            crate::spec::RunDefinition::Workflow { .. } => "run_workflow",
182        };
183
184        Ok(ActivityIR {
185            name: format!("Run Step {}", index),
186            description: Some("Serverless Workflow run step".to_string()),
187            inputs: Vec::new(),
188            outputs: Vec::new(),
189            timeout: None,
190            retry_policy: None,
191            implementation: ActivityImplementation::Function {
192                function_name: function_name.to_string(),
193            },
194        })
195    }
196
197    fn convert_switch_step(&self, _switch: &[crate::spec::SwitchCase], index: usize) -> StdResult<ActivityIR, WorkflowError> {
198        Ok(ActivityIR {
199            name: format!("Switch Step {}", index),
200            description: Some("Serverless Workflow switch step".to_string()),
201            inputs: Vec::new(),
202            outputs: Vec::new(),
203            timeout: None,
204            retry_policy: None,
205            implementation: ActivityImplementation::Function {
206                function_name: "switch".to_string(),
207            },
208        })
209    }
210
211    fn convert_for_step(&self, _for_def: &crate::spec::ForDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
212        Ok(ActivityIR {
213            name: format!("For Step {}", index),
214            description: Some("Serverless Workflow for loop step".to_string()),
215            inputs: Vec::new(),
216            outputs: Vec::new(),
217            timeout: None,
218            retry_policy: None,
219            implementation: ActivityImplementation::Function {
220                function_name: "for_loop".to_string(),
221            },
222        })
223    }
224
225    fn convert_fork_step(&self, _fork: &crate::spec::ForkDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
226        Ok(ActivityIR {
227            name: format!("Fork Step {}", index),
228            description: Some("Serverless Workflow fork step".to_string()),
229            inputs: Vec::new(),
230            outputs: Vec::new(),
231            timeout: None,
232            retry_policy: None,
233            implementation: ActivityImplementation::Function {
234                function_name: "fork".to_string(),
235            },
236        })
237    }
238
239    fn convert_try_step(&self, _try_def: &crate::spec::TryDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
240        Ok(ActivityIR {
241            name: format!("Try Step {}", index),
242            description: Some("Serverless Workflow try-catch step".to_string()),
243            inputs: Vec::new(),
244            outputs: Vec::new(),
245            timeout: None,
246            retry_policy: None,
247            implementation: ActivityImplementation::Function {
248                function_name: "try_catch".to_string(),
249            },
250        })
251    }
252
253    fn convert_raise_step(&self, _raise: &crate::spec::RaiseDefinition, index: usize) -> StdResult<ActivityIR, WorkflowError> {
254        Ok(ActivityIR {
255            name: format!("Raise Step {}", index),
256            description: Some("Serverless Workflow raise error step".to_string()),
257            inputs: Vec::new(),
258            outputs: Vec::new(),
259            timeout: None,
260            retry_policy: None,
261            implementation: ActivityImplementation::Function {
262                function_name: "raise_error".to_string(),
263            },
264        })
265    }
266
267    fn convert_set_step(&self, _set: &HashMap<String, serde_json::Value>, index: usize) -> StdResult<ActivityIR, WorkflowError> {
268        Ok(ActivityIR {
269            name: format!("Set Step {}", index),
270            description: Some("Serverless Workflow set variable step".to_string()),
271            inputs: Vec::new(),
272            outputs: Vec::new(),
273            timeout: None,
274            retry_policy: None,
275            implementation: ActivityImplementation::Function {
276                function_name: "set_variable".to_string(),
277            },
278        })
279    }
280}