1use indexmap::IndexMap;
2use serde::{Deserialize, Serialize};
3use serde_json::{Map, Value};
4
5use crate::{
6 error::{FlowError, FlowErrorLocation, Result},
7 loader::load_ygtc_from_str,
8 model::{FlowDoc, NodeDoc},
9};
10
11#[derive(Debug, Clone)]
14pub struct FlowIr {
15 pub id: String,
16 pub kind: String,
17 pub schema_version: Option<u32>,
18 pub entrypoints: IndexMap<String, String>,
19 pub nodes: IndexMap<String, NodeIr>,
20}
21
22#[derive(Debug, Clone)]
23pub struct NodeIr {
24 pub id: String,
25 pub operation: String,
26 pub payload: Value,
27 pub output: Value,
28 pub routing: Vec<Route>,
29 pub telemetry: Option<Value>,
30}
31
32#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
33pub struct Route {
34 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub to: Option<String>,
36 #[serde(default, skip_serializing_if = "is_false")]
37 pub out: bool,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub status: Option<String>,
40 #[serde(default, skip_serializing_if = "is_false")]
41 pub reply: bool,
42}
43
44fn is_false(value: &bool) -> bool {
45 !*value
46}
47
48impl FlowIr {
49 pub fn from_doc(doc: FlowDoc) -> Result<Self> {
50 let schema_version = doc.schema_version;
51 let entrypoints = resolve_entrypoints(&doc);
52 let mut nodes = IndexMap::new();
53 for (id, node_doc) in doc.nodes {
54 let (operation, payload) = extract_operation(&node_doc, &id)?;
55 let routing = parse_routing(&node_doc, &id)?;
56 let output = node_doc
57 .raw
58 .get("output")
59 .cloned()
60 .unwrap_or_else(|| Value::Object(Map::new()));
61 nodes.insert(
62 id.clone(),
63 NodeIr {
64 id: id.clone(),
65 operation,
66 payload,
67 output,
68 routing,
69 telemetry: node_doc
70 .telemetry
71 .clone()
72 .and_then(|t| serde_json::to_value(t).ok()),
73 },
74 );
75 }
76
77 Ok(FlowIr {
78 id: doc.id,
79 kind: doc.flow_type,
80 schema_version,
81 entrypoints,
82 nodes,
83 })
84 }
85
86 pub fn to_doc(&self) -> Result<FlowDoc> {
87 let mut nodes: IndexMap<String, NodeDoc> = IndexMap::new();
88 for (id, node_ir) in &self.nodes {
89 let mut raw = IndexMap::new();
90 raw.insert(node_ir.operation.clone(), node_ir.payload.clone());
91 if !node_ir.output.is_object()
92 || !node_ir
93 .output
94 .as_object()
95 .map(|m| m.is_empty())
96 .unwrap_or(false)
97 {
98 raw.insert("output".to_string(), node_ir.output.clone());
99 }
100 let routing_value =
101 serde_json::to_value(&node_ir.routing).map_err(|e| FlowError::Internal {
102 message: format!("serialize routing for node '{id}': {e}"),
103 location: FlowErrorLocation::at_path(format!("nodes.{id}.routing")),
104 })?;
105 let routing_yaml = if node_ir.routing.len() == 1
106 && node_ir.routing[0].out
107 && node_ir.routing[0].to.is_none()
108 && !node_ir.routing[0].reply
109 && node_ir.routing[0].status.is_none()
110 {
111 Value::String("out".to_string())
112 } else if node_ir.routing.len() == 1
113 && node_ir.routing[0].reply
114 && node_ir.routing[0].to.is_none()
115 && !node_ir.routing[0].out
116 && node_ir.routing[0].status.is_none()
117 {
118 Value::String("reply".to_string())
119 } else {
120 routing_value
121 };
122 nodes.insert(
123 id.clone(),
124 NodeDoc {
125 routing: routing_yaml,
126 telemetry: node_ir
127 .telemetry
128 .as_ref()
129 .and_then(|t| serde_json::from_value(t.clone()).ok()),
130 operation: Some(node_ir.operation.clone()),
131 payload: node_ir.payload.clone(),
132 raw,
133 },
134 );
135 }
136
137 Ok(FlowDoc {
138 id: self.id.clone(),
139 title: None,
140 description: None,
141 flow_type: self.kind.clone(),
142 start: self.entrypoints.get("default").cloned(),
143 parameters: Value::Object(Map::new()),
144 tags: Vec::new(),
145 schema_version: self.schema_version,
146 entrypoints: IndexMap::new(),
147 nodes,
148 })
149 }
150}
151
152fn resolve_entrypoints(doc: &FlowDoc) -> IndexMap<String, String> {
153 let mut entries = IndexMap::new();
154 if let Some(start) = &doc.start {
155 entries.insert("default".to_string(), start.clone());
156 } else if doc.nodes.contains_key("in") {
157 entries.insert("default".to_string(), "in".to_string());
158 } else if let Some(first) = doc.nodes.keys().next() {
159 entries.insert("default".to_string(), first.clone());
160 }
161 for (k, v) in &doc.entrypoints {
162 if let Some(target) = v.as_str() {
163 entries.insert(k.clone(), target.to_string());
164 }
165 }
166 entries
167}
168
169fn parse_routing(node: &NodeDoc, node_id: &str) -> Result<Vec<Route>> {
170 if node.routing.is_null() {
171 return Ok(Vec::new());
172 }
173 if let Some(s) = node.routing.as_str() {
174 return match s {
175 "out" => Ok(vec![Route {
176 out: true,
177 ..Route::default()
178 }]),
179 "reply" => Ok(vec![Route {
180 reply: true,
181 ..Route::default()
182 }]),
183 other => Err(FlowError::Routing {
184 node_id: node_id.to_string(),
185 message: format!("unsupported routing shorthand '{other}'"),
186 location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
187 }),
188 };
189 }
190 #[derive(serde::Deserialize)]
191 struct RouteDoc {
192 #[serde(default)]
193 to: Option<String>,
194 #[serde(default)]
195 out: Option<bool>,
196 #[serde(default)]
197 status: Option<String>,
198 #[serde(default)]
199 reply: Option<bool>,
200 }
201
202 let routes: Vec<RouteDoc> =
203 serde_json::from_value(node.routing.clone()).map_err(|e| FlowError::Internal {
204 message: format!("routing decode for node '{node_id}': {e}"),
205 location: FlowErrorLocation::at_path(format!("nodes.{node_id}.routing")),
206 })?;
207
208 Ok(routes
209 .into_iter()
210 .map(|r| Route {
211 to: r.to,
212 out: r.out.unwrap_or(false),
213 status: r.status,
214 reply: r.reply.unwrap_or(false),
215 })
216 .collect())
217}
218
219pub fn parse_flow_to_ir(yaml: &str) -> Result<FlowIr> {
221 let doc = load_ygtc_from_str(yaml)?;
222 FlowIr::from_doc(doc)
223}
224
225fn extract_operation(node: &NodeDoc, node_id: &str) -> Result<(String, Value)> {
226 let reserved = [
227 "routing",
228 "telemetry",
229 "output",
230 "retry",
231 "timeout",
232 "when",
233 "annotations",
234 "meta",
235 ];
236 if let Some(exec) = node.raw.get("component.exec") {
237 let op = node
238 .raw
239 .get("operation")
240 .and_then(Value::as_str)
241 .or(node.operation.as_deref())
242 .unwrap_or("");
243 if op.trim().is_empty() {
244 return Err(FlowError::Internal {
245 message: format!("node '{node_id}' missing operation key"),
246 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
247 });
248 }
249 return Ok((op.to_string(), exec.clone()));
250 }
251 let mut op_key: Option<String> = None;
252 let mut payload: Option<Value> = None;
253 for (k, v) in &node.raw {
254 if reserved.contains(&k.as_str()) {
255 continue;
256 }
257 if op_key.is_some() {
258 return Err(FlowError::Internal {
259 message: format!(
260 "node '{node_id}' must have exactly one operation key, found multiple"
261 ),
262 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
263 });
264 }
265 op_key = Some(k.clone());
266 payload = Some(v.clone());
267 }
268 if let (Some(k), Some(v)) = (op_key, payload) {
269 return Ok((k, v));
270 }
271
272 if let Some(op) = &node.operation {
273 return Ok((op.clone(), node.payload.clone()));
274 }
275
276 Err(FlowError::Internal {
277 message: format!("node '{node_id}' missing operation key"),
278 location: FlowErrorLocation::at_path(format!("nodes.{node_id}")),
279 })
280}